gpt4 book ai didi

cassandra - Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中

转载 作者:行者123 更新时间:2023-12-02 23:42:02 25 4
gpt4 key购买 nike

我每天都会收到多次传入的各种 CSV 文件,存储来自传感器的时间序列数据,这些传感器是传感器站的一部分。每个 CSV 均以其来源的传感器站和传感器 ID 命名,例如“station1_sensor2.csv”。目前,数据存储如下:

> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;

我创建了一个 Cassandra 表来存储它们并能够查询它们以查找各种已识别的任务。 Cassandra 表如下所示:

cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};

CREATE TABLE sensor_data (
station_id text, // id of the station
sensor_id text, // id of the sensor
tps timestamp, // timestamp of the measure
val float, // measured value
PRIMARY KEY ((station_id, sensor_id), tps)
);

我想使用 Apache Nifi 自动将 CSV 中的数据存储到此 Cassandra 表中,但我找不到正确执行此操作的示例或方案。我尝试使用“PutCassandraQL”处理器,但我在没有任何明确示例的情况下苦苦挣扎。因此,如果您能获得有关如何使用 Apache Nifi 执行 Cassandra put 查询以将数据插入表中的任何帮助,我们将不胜感激!

最佳答案

TL;DR 我有一个 NiFi 1.0 模板来完成此任务 Gist并在 NiFi Wiki .

NiFi 鼓励高度模块化的设计,因此让我们将其分解为更小的任务,我将描述可能的流程并根据您的用例解释每个处理器的用途:

enter image description here

  1. 读入 CSV 文件。这可以通过 GetFile 来完成,或者最好使用 ListFile -> FetchFile。在我的示例中,我使用脚本处理器来创建内嵌流文件,其中包含上面的示例数据。这使得我的模板可供其他人使用。

  2. 解析文件名以获取站点和传感器字段。这使用 NiFi Expression Language获取文件名中下划线之前(代表站)和下划线之后(减去 CSV 扩展名)之后的部分(代表传感器)。

  3. 将单个 CSV 流文件拆分为每行一个流文件。这样做是为了稍后我们可以创建单独的 CQL INSERT 语句。

  4. 从每行中提取列值。我为此使用了 ExtractText 和正则表达式,如果您有非常复杂的逻辑,您可能需要检查脚本处理器,例如 ExecuteScript .

  5. 更改时间戳。 IIRC,CQL 不接受时间戳文字上的微秒。您可以尝试解析微秒(最好在 ExecuteScript 处理器中完成)或只是重新格式化时间戳。请注意,由于无法解析微秒,因此“重新格式化”会导致我的示例中所有小数秒被截断。

  6. 构建 CQL INSERT 语句。此时,数据(无论如何在我的模板中)都在流文件属性中,原始内容可以用 CQL INSERT 语句替换(这是 PutCassandraQL 所期望的方式)。您可以将数据保留在属性中(使用 UpdateAttribute 正确命名它们,请参阅 PutCassandraQL 文档)并使用准备好的语句,但恕我直言,编写显式 CQL 语句更简单。在撰写本文时,PutCassandraQL 并未缓存PreparedStatements,因此目前以这种方式执行操作的性能实际上较低。

  7. 使用 PutCassandraQL 执行 CQL 语句。

我没有详细介绍属性名称等,但当流程到达 ReplaceText 时,我具有以下属性:

  • station.name:包含从文件名解析出的电台名称
  • sensor.name:包含从文件名解析出的传感器名称
  • tps:包含更新的时间戳值
  • columns.2:(大概)包含传感器读数的值

ReplaceText 将内容设置为以下内容(使用表达式语言填充值):

insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})

希望对您有所帮助,如果您有任何疑问或问题,请告诉我。干杯!

关于cassandra - Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39201597/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com