gpt4 book ai didi

How to import a CSV file into a partitioned table with MultithreadedTableWriter provided by DolphinDB C++ API?(如何使用DolphinDB C++API提供的MultithadedTableWriter将CSV文件导入到分区表中?)

转载 作者:bug小助手 更新时间:2023-10-24 17:10:49 26 4
gpt4 key购买 nike



I have a CSV file and the first 10 records of the file are as follows:

我有一个CSV文件,该文件的前10条记录如下:


time    device_id   battery_level   battery_status  battery_temperature bssid   cpu_avg_1min    cpu_avg_5min    cpu_avg_15min   mem_free    mem_used    rssi    ssid
2016.11.15T07:00:00 demo000029 48 discharging 90.3 01:02:03:04:05:06 7.42 7.004 7.1213 460,781,455 539,218,545 (58) demo-net
2016.11.15T07:00:00 demo000034 31 discharging 88.9 01:02:03:04:05:06 5.36 7.152 7.6373 649,945,465 350,054,535 (58) demo-net
2016.11.15T07:00:00 demo000037 75 discharging 90.7 A0:B1:C5:D2:E0:F3 7.26 6.572 6.644 420,956,932 579,043,068 (49) stealth-net
2016.11.15T07:00:00 demo000059 78 discharging 89.9 A0:B1:C5:D2:E0:F3 24.35 9.91 7.69 559,660,292 440,339,708 (48) stealth-net
2016.11.15T07:00:00 demo000063 47 discharging 89.8 22:32:A2:B3:05:98 30.23 13.166 10.5087 730,143,821 269,856,179 (41) demo-5ghz
2016.11.15T07:00:00 demo000084 56 discharging 92.4 A0:B1:C5:D2:E0:F3 6.97 8.354 8.7713 490,555,023 509,444,977 (44) stealth-net
2016.11.15T07:00:00 demo000087 85 discharging 88.5 22:32:A2:B3:05:98 28.13 10.186 7.382 669,735,198 330,264,802 (47) demo-5ghz
2016.11.15T07:00:00 demo000096 92 discharging 93 A0:B1:C5:D2:E0:F3 28.28 11.416 8.792 569,078,358 430,921,642 (66) stealth-net
2016.11.15T07:00:00 demo000099 53 discharging 87.9 22:32:A2:B3:05:98 8.98 8.196 8.252 449,422,871 550,577,129 (57) demo-5ghz
2016.11.15T07:00:00 demo000101 72 discharging 92.3 A0:B1:C5:D2:E0:F3 96.74 26.228 14.6627 619,974,710 380,025,290 (57) stealth-net

Below is the script for creating databases and tables:

下面是创建数据库和表的脚本:


login(`admin, `123456)
if (exists('dfs://iot') ) dropDatabase('dfs://iot')

db1 = database('',VALUE,2016.11.15..2016.11.18)
db2 = database('',HASH,[SYMBOL,10])
db = database('dfs://iot',COMPO,[db1,db2])

schema=table(1:0,`time`device_id`battery_level`battery_status`battery_temperature`bssid`cpu_avg_1min`cpu_avg_5min`cpu_avg_15min`mem_free`mem_used`rssi`ssid,
[DATETIME,SYMBOL,INT,SYMBOL,DOUBLE,SYMBOL,DOUBLE,DOUBLE,DOUBLE,LONG,LONG,SHORT,SYMBOL])
db.createPartitionedTable(schema,`readings,`time`device_id)

I want to import a CSV file into a partitioned table using MultithreadedTableWriter provided by the DolphinDB C++ API. How can I achieve this?

我想使用DolphinDB C++API提供的MultithadedTableWriter将CSV文件导入到分区表中。我怎样才能做到这一点呢?


更多回答
优秀答案推荐

You can use the open-source C++ CSV parser library rapidCsv to read the file, and then use MultithreadedTableWriter to write the data to the partitioned table. Here's the example:

您可以使用开放源码的C++CSV解析器库RapidCsv读取文件,然后使用MultithadedTableWriter将数据写入分区表。下面是一个例子:


#include "MultithreadedTableWriter.h"
#include "DolphinDB.h"
#include "Util.h"
#include "rapidcsv.h"
#include <string>
#include <vector>
#include <thread>

using namespace std;
using namespace dolphindb;

Constant *createDateTime(string str) {
int year, month, day, hour, minute, second;
year = atoi(str.substr(0, 4).c_str());
month = atoi(str.substr(5, 2).c_str());
day = atoi(str.substr(8, 2).c_str());
hour = atoi(str.substr(11, 2).c_str());
minute = atoi(str.substr(14, 2).c_str());
second = atoi(str.substr(17, 2).c_str());
return Util::createDateTime(year, month, day, hour, minute, second);
}

int main(int argc, char *argv[]) {
DBConnection::initialize();
DBConnection conn;
string host = "127.0.0.1";
int port = 8848;
string userId = "admin";
string password = "123456";

string path = "d:/data/devices_big/devices_big_readings.csv";

try {
vector<COMPRESS_METHOD> compress;
compress.push_back(COMPRESS_DELTA);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_DELTA);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_DELTA);
compress.push_back(COMPRESS_DELTA);
compress.push_back(COMPRESS_LZ4);
compress.push_back(COMPRESS_LZ4);

MultithreadedTableWriter writer("127.0.0.1", 8848, "admin", "123456", "dfs://iot", "readings",
false, false, NULL, 10000, 1, 10, "device_id", &compress);

rapidcsv::Document doc(path, rapidcsv::LabelParams(-1, -1));
std::vector<string> time = doc.GetColumn<string>(0);
std::vector<string> device_id = doc.GetColumn<string>(1);
std::vector<int> battery_level = doc.GetColumn<int>(2);
std::vector<string> battery_status = doc.GetColumn<string>(3);
std::vector<double> battery_temperature = doc.GetColumn<double>(4);
std::vector<string> bssid = doc.GetColumn<string>(5);
std::vector<double> cpu_avg_1min = doc.GetColumn<double>(6);
std::vector<double> cpu_avg_5min = doc.GetColumn<double>(7);
std::vector<double> cpu_avg_15min = doc.GetColumn<double>(8);
std::vector<long long> mem_free = doc.GetColumn<long long>(9);
std::vector<long long> mem_used = doc.GetColumn<long long>(10);
std::vector<int> rssi = doc.GetColumn<int>(11);
std::vector<string> ssid = doc.GetColumn<string>(12);

int rowNum = time.size();

ErrorCodeInfo errorInfo;

long long startTime = Util::getEpochTime();

for (int i = 0; i < rowNum; i++) {
if (writer.insert(errorInfo,
createDateTime(time[i]),
device_id[i],
battery_level[i],
battery_status[i],
battery_temperature[i],
bssid[i],
cpu_avg_1min[i],
cpu_avg_5min[i],
cpu_avg_15min[i],
mem_free[i],
mem_used[i],
(short)rssi[i],
ssid[i]
) == false) {
//This part will not be executed.
cout << "insert failed: " << errorInfo.errorInfo << endl;
break;
}

}
// Check the current status of MTW
MultithreadedTableWriter::Status status;
writer.getStatus(status);
if (status.hasError()) {
cout << "error in writing: " << status.errorInfo << endl;
}
writer.waitForThreadCompletion();
// Check whether the tasks have been completed
writer.getStatus(status);
if (status.hasError()) {
cout << "error after write complete: " << status.errorInfo << endl;
// Obtain unwritten data
std::vector<std::vector<ConstantSP>*> unwrittenData;
writer.getUnwrittenData(unwrittenData);
cout << "unwriterdata length " << unwrittenData.size() << endl;
}

cout << "Insert Time " << Util::getEpochTime() - startTime << " ms" << endl;
//Check the final result
cout << conn.run("select count(*) from pt")->getString() << endl;


}
catch (std::exception &e) {
cerr << "Failed to insert table, with exception: " << e.what() << endl;
}
}

更多回答

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com