gpt4 book ai didi

mongodb - 蒙戈 : Storing time series measurement data

转载 作者:可可西里 更新时间:2023-11-01 09:39:46 25 4
gpt4 key购买 nike

我们存储每个时间间隔的温度测量数据,我们希望只保留 90 个以前的数据。我们的数据结构是这样的:

{ "_id" : ObjectId("xxx"), "device" : "deviceId1", "count": 2, "values" : 
[
{ "ts" : NumberLong("1471077454902"), "measureData" : 37.3 },
{ "ts" : NumberLong("1471077454911"), "measureData" : 37.4 }
]
}

count 是值的大小,例如当数组有 2 个元素时,则大小为 2。我们将 Java API 设计如下:

 When the new measurement data comes:
Get corresponding device id count:
if count < 90:
Push the new mesaure data into values and increase count by 1
if count >90:
Pull the first element of the array and push the latest data into array.
Store the first element pulled into history collection.

是否有一个查询或一个聚合可以执行这些步骤?或者我们应该通过传统的方法来做到这一点,比如查询、评估,然后推送或拉/推。

////***************part 2 传统方法***************//

public class TSDesignMain {

public static void main(String[] args) {
MongoClient mClient = new MongoClient();
MongoDatabase db = mClient.getDatabase(MongoTSConstants.dbname);

long tWarmingStart = System.nanoTime();
InsertingWarmingDocument(db);
long tWarmingEnd = System.nanoTime();
double tDurationWarming = (double) (tWarmingEnd-tWarmingStart) / 1000/1000;
System.out.println("warming db, insert 10000 document per event duration is "+tDurationWarming+"ms");


long tInsert1Dev1DocStart = System.nanoTime();
for(int j=1;j<10000;j++){
storeMeasureDataIntoDB(db);
}
long tInsert1Dev1DocEnd = System.nanoTime();
double tInsert1Dev1DocDuration = (double) (tInsert1Dev1DocEnd-tInsert1Dev1DocStart) / 1000/1000;
System.out.println("insert 10000 document in 90*24*30 elements array duration is "+tInsert1Dev1DocDuration+"ms");


long tQuery1Dev1DocStart = System.nanoTime();
for(int j=1;j<10000;j++){
handleQueryDocument(db);
}
long tQuery1Dev1DocEnd = System.nanoTime();
double tQuery1Dev1DocDuration = (double) (tQuery1Dev1DocEnd-tQuery1Dev1DocStart) / 1000/1000;
System.out.println("query 10000 times in 90*24*30 elements array duration is "+tQuery1Dev1DocDuration+"ms");


mClient.close();

}

private static void InsertingWarmingDocument(MongoDatabase db) {

long ts = Calendar.getInstance().getTimeInMillis();
for(long i=1;i<100;i++){
db.getCollection(MongoTSConstants.tsDataPdCollection).insertOne(
new Document(MongoTSConstants.deviceFn,MongoTSConstants.deviceIdPrefix+i+"test")
.append(MongoTSConstants.tsFn,ts+i)
);
}
}

private static void handleQueryDocument(MongoDatabase db) {
Document where = new Document(MongoTSConstants.deviceFn,MongoTSConstants.deviceIdPrefix+1);
FindIterable<Document> it = db.getCollection(MongoTSConstants.tsDataInOneCollection).find(where);
MongoCursor<Document> cur = it.iterator();

int i=0;

if(!cur.hasNext()){
Document doc = cur.next();
ArrayList<Document> obj = (ArrayList<Document>) doc.get("values");
}

}



private static void storeMeasureDataIntoDB(MongoDatabase db) {
Document where = new Document(MongoTSConstants.deviceFn,MongoTSConstants.deviceIdPrefix+1);
FindIterable<Document> it = db.getCollection(MongoTSConstants.tsDataInOneCollection).find(where);
MongoCursor<Document> cur = it.iterator();

int i=0;


/**
* There is no device document in DB, insert new one. and use update to store 1st measure data
*
*/
if(!cur.hasNext()){
long tsInsert = System.nanoTime();
/**
* insert device id
*/
db.getCollection(MongoTSConstants.tsDataInOneCollection).insertOne(
new Document(MongoTSConstants.deviceFn,MongoTSConstants.deviceIdPrefix+1)
.append(MongoTSConstants.countFn,0));
/**
* using update to insert values array (store the first measure data)
*
*/
db.getCollection(MongoTSConstants.tsDataInOneCollection).updateOne(
new Document(MongoTSConstants.deviceFn,MongoTSConstants.deviceIdPrefix+1),
new Document("$push", new Document("values",new Document(MongoTSConstants.tsFn,tsInsert).
append(MongoTSConstants.measureDataFn,37.1)))
.append("$inc", new Document(MongoTSConstants.countFn,1)));
}else{
while(cur.hasNext()){
/**
* if i > 1, it means there are two doc with same device id, error!!
*/
if(i>=1){
//log error find two docs with same devicedID.
break;
}

Document doc = cur.next();
Integer count = doc.getInteger("count");

/**
* measure data has over 3 month.
*/
if( count >= MongoTSConstants.queryDataLength ){

/**
* get the first one in the array
*/
ArrayList<Document> obj = (ArrayList<Document>) doc.get("values");
Document doc1Elem = (Document) obj.get(0);

/**
* pull the first one in the array
*/
db.getCollection(MongoTSConstants.tsDataInOneCollection).updateOne(where,
new Document("$pop", -1));
long ts = Calendar.getInstance().getTimeInMillis();

/**
* push the new one
*/
db.getCollection(MongoTSConstants.tsDataInOneCollection).updateOne(where,
new Document("$push", new Document("values",new Document(MongoTSConstants.tsFn,ts).
append(MongoTSConstants.measureDataFn,37.8))));
//Store doc1Elem in another history collection;
db.getCollection(MongoTSConstants.tsHistoryCollection).insertOne(doc1Elem);
}else{
/**
* Measure data has not reach 3 month data
*/
long ts = Calendar.getInstance().getTimeInMillis();
db.getCollection(MongoTSConstants.tsDataInOneCollection).updateOne(where,
new Document("$push", new Document("values",new Document(MongoTSConstants.tsFn,ts).
append(MongoTSConstants.measureDataFn,37.9))).append("$inc", new Document(MongoTSConstants.countFn,1)));
}
i++;
}
}
}
}

最佳答案

在我看来,目前很难做到这一点(目前最新版本的mongodb是v3.2。),我不知道如何实现目标。

说明

我。概述

此处有 5 个操作,您想将它们合并为 1 个更新操作:

  1. 通过deviceId查询
  2. count 评估(count < 90 或 count >= 90?)
  3. 将一个新数据压入values数组的尾部
  4. values数组中弹出最早的数据(可选,基于第2步)
  5. count + 1(可选,基于第 2 步)

让我们把它们分成两部分:查询部分 & 更新部分

二。查询部分:Merge Query(1st) and Evaluation(2nd)?

因为我不知道如何将Evaluation 放入更新语句的update action 部分:

db.test.update(
{/* Query criteria */},
{/* Update action */}
)

我采用了另一种方法 - 将评估放入查询条件部分,如下所示:

// We call this statement as UPDATE_STAT_1
db.test.update(
{device: 'deviceId1', count: {$lt: 90}},
{/* Update action: push new data to values and increase count */}
);

如果计数<90,您的示例文档将被匹配并执行更新操作(此时不关心更新操作是成功还是失败)。然后mongoDB会返回执行结果给你:

// Matched and updated
WriteResult({
"nMatched" : 1,
"nUpserted" : 0,
"nModified" : 1
});

结果告诉你一个文档被匹配和更新(nMatched = 1 & nModified = 1)

另一方面,如果count >= 90,则不会匹配和更新任何内容,执行结果将是:

// Not Matched
WriteResult({
"nMatched" : 0,
"nUpserted" : 0,
"nModified" : 0
})

所以你会知道计数达到90,你应该执行另一个更新语句:

// We call this statement as UPDATE_STAT_2
db.test.update(
{device: 'deviceId1', count: {$gte: 90}},
{/* Update action: push new data to values and pop the oldest*/}
);

总结,您可以先执行UPDATE_STAT_1,然后检查执行结果来决定是否需要像这样执行UPDATE_STAT_2(伪代码):

exe_result = run(UPDATE_STAT_1); // Run UPDATE_STAT_1
if(exe_result.nMatched == 0 && exe_result.nModified == 0) {
run(UPDATE_STAT_2); // Check the count and Run UPDATE_STAT_2
}

这是我合并查询(第 1 次)和评估(第 2 次)的方法。我不知道如何将上述所有代码合并到一条语句中。

三。更新部分:Merge Push(3rd)/Pop(4th)/Increase(5th)?

我在本地试过,你可以把Push & Increase放在一条语句中:

db.test.update(
{device: 'deviceId1', count: {$lt: 90}},
{
$push: {values: { "ts" : "1471077454988", "measureData" : 39 }},
$inc : {count: 1}
}
);

同时进行 Push(3rd) 和 Increase(5th) 是合法的。但是如果你想压入和弹出数组:

db.test.update(
{device: 'deviceId1', count: {$gte: 90}},
{
$pop: {values: -1},
$push: {values: { "ts" : "1471077454911", "measureData" : 40 }}
}
);

你会得到错误:

WriteResult({
"nMatched" : 0,
"nUpserted" : 0,
"nModified" : 0,
"writeError" : {
"code" : 16837,
"errmsg" : "Cannot update 'values' and 'values' at the same time"
}
})

这意味着您不能同时对值进行推送和弹出操作。为此,我找到了这个解释:

The issue is that MongoDB doesn’t allow multiple operations on the same property in the same update call. This means that the two operations must happen in two individually atomic operations.

所以。总之,它无法将 Push(3rd)/Pop(4th)/Increase(5th) 合并到一个语句中。

四。总结

根据上面的示例,Query(1st) 和 Evaluation(2nd) 不能合并,Push(3rd)/Pop(4th)/Increase(5th) 也不能合并。所以你可以用传统的方式来做这件事。谢谢。

关于mongodb - 蒙戈 : Storing time series measurement data,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38949796/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com