- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我每天都会收到多次传入的各种 CSV 文件,存储来自传感器的时间序列数据,这些传感器是传感器站的一部分。每个 CSV 均以其来源的传感器站和传感器 ID 命名,例如“station1_sensor2.csv”。目前,数据存储如下:
> cat station1_sensor2.csv
2016-05-04 03:02:01.001000+0000;0;
2016-05-04 03:02:01.002000+0000;0.1234;
2016-05-04 03:02:01.003000+0000;0.2345;
我创建了一个 Cassandra 表来存储它们并能够查询它们以查找各种已识别的任务。 Cassandra 表如下所示:
cqlsh > CREATE KEYSPACE data with replication = {'class' : 'SimpleStrategy', 'replication_factor' : 3};
CREATE TABLE sensor_data (
station_id text, // id of the station
sensor_id text, // id of the sensor
tps timestamp, // timestamp of the measure
val float, // measured value
PRIMARY KEY ((station_id, sensor_id), tps)
);
我想使用 Apache Nifi 自动将 CSV 中的数据存储到此 Cassandra 表中,但我找不到正确执行此操作的示例或方案。我尝试使用“PutCassandraQL”处理器,但我在没有任何明确示例的情况下苦苦挣扎。因此,如果您能获得有关如何使用 Apache Nifi 执行 Cassandra put 查询以将数据插入表中的任何帮助,我们将不胜感激!
最佳答案
TL;DR 我有一个 NiFi 1.0 模板来完成此任务 Gist并在 NiFi Wiki .
NiFi 鼓励高度模块化的设计,因此让我们将其分解为更小的任务,我将描述可能的流程并根据您的用例解释每个处理器的用途:
读入 CSV 文件。这可以通过 GetFile 来完成,或者最好使用 ListFile -> FetchFile。在我的示例中,我使用脚本处理器来创建内嵌流文件,其中包含上面的示例数据。这使得我的模板可供其他人使用。
解析文件名以获取站点和传感器字段。这使用 NiFi Expression Language获取文件名中下划线之前(代表站)和下划线之后(减去 CSV 扩展名)之后的部分(代表传感器)。
将单个 CSV 流文件拆分为每行一个流文件。这样做是为了稍后我们可以创建单独的 CQL INSERT 语句。
从每行中提取列值。我为此使用了 ExtractText 和正则表达式,如果您有非常复杂的逻辑,您可能需要检查脚本处理器,例如 ExecuteScript .
更改时间戳。 IIRC,CQL 不接受时间戳文字上的微秒。您可以尝试解析微秒(最好在 ExecuteScript 处理器中完成)或只是重新格式化时间戳。请注意,由于无法解析微秒,因此“重新格式化”会导致我的示例中所有小数秒被截断。
构建 CQL INSERT 语句。此时,数据(无论如何在我的模板中)都在流文件属性中,原始内容可以用 CQL INSERT 语句替换(这是 PutCassandraQL 所期望的方式)。您可以将数据保留在属性中(使用 UpdateAttribute 正确命名它们,请参阅 PutCassandraQL 文档)并使用准备好的语句,但恕我直言,编写显式 CQL 语句更简单。在撰写本文时,PutCassandraQL 并未缓存PreparedStatements,因此目前以这种方式执行操作的性能实际上较低。
使用 PutCassandraQL 执行 CQL 语句。
我没有详细介绍属性名称等,但当流程到达 ReplaceText 时,我具有以下属性:
ReplaceText 将内容设置为以下内容(使用表达式语言填充值):
insert into sensor_data (station_id, sensor_id, tps, val) values ('${station.name}', '${sensor.name}', '${tps}', ${column.2})
希望对您有所帮助,如果您有任何疑问或问题,请告诉我。干杯!
关于cassandra - Apache Nifi/Cassandra - 如何将 CSV 加载到 Cassandra 表中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39201597/
我们有 2 个 cassandra 集群,第一个有旧数据,第二个有新数据。 现在我们想要将旧数据从第一个集群移动或复制到第二个集群。什么是最好的方法来做到这一点以及如何做到这一点? 我们正在使用 DS
我正在考虑安装 OpsCenter 来监控我们在 RackSpace VM 上运行的 24 节点 Cassandra 集群。过去我听说 OpsCenter 减慢了集群速度。我有点担心 OpsCente
假设我有一个复制因子(RF)= 2 的 2 节点集群。 我使用一致性 2 触发插入。当客户端等待响应时,Cassandra 开始写入这 2 个节点。中间一个节点失败,无法完成写入,而另一节点上的写入成
已结束。此问题正在寻求书籍、工具、软件库等的推荐。它不满足 Stack Overflow guidelines 。目前不接受答案。 我们不允许提出寻求书籍、工具、软件库等推荐的问题。您可以编辑问题,以
我在 Cassandra 中有一个表,其中我用 1000 多个条目填充了一些行(每行有 10000 多列)。行中的条目更新非常频繁,基本上只是一个字段(它是一个整数)被更新为不同的值。列的所有其他值保
当Cassandra端有“掉落的突变”时,它是否向调用客户端返回相应的失败?或者即使在服务器端丢弃相应的突变并导致数据丢失,它总是成功响应调用事务的调用客户端? 在一个特定实例中,当我们的 TPS 约
我有一个 Multi-Tenancy 应用程序,其中 tenantId 将成为每个查询的一部分,因此我将其放入所有表的分区键中。 例子: CREATE TABLE users { tenantId t
根据 Datastax 文档,在 Cassandra 中先读后写是一种反模式。 每当我们在 CQLSH 中使用 UPDATE 或使用 Datastax 驱动程序来设置几列(带有 IF 和集合更新)时,
是否有命令或任何方式可以知道 Cassandra 的哪些节点上存储了哪些数据? 我对 Cassandra 很陌生,在谷歌上搜索这个问题并没有多少运气。 谢谢! 最佳答案 您可以使用 nodetool
我们有一个包含 1500 万条记录的表,而我们的表是一个 10 节点的 cassandra 集群。我们有一列有接近 20 个可重复值。是否建议在此列上建立二级索引? 最佳答案 假设在该列上完全均匀分布
Cassandra 发布了它的 technical limitations但没有提到允许的最大列数。是否有最大列数?我需要存储 400 多个字段。这在 Cassandra 中可能吗? 最佳答案 每行的
我想知道当表中有多个非 PK 列时会发生什么。我读过这个例子: http://johnsanda.blogspot.co.uk/2012/10/why-i-am-ready-to-move-to-cq
我有两个关于 Cassandra 查询结果的问题。 当我在 Cassandra 中对表进行“完全”选择(即 select * from table )时,是否保证结果将按分区标记的递增顺序返回? 例如
我无法为 Cassandra 设置 Hector。我已经浏览了 documentation和 Cassandra wiki .这些文档的问题在于,那里的很多信息都已经过时或过时(或者我缺乏知识)。无论
我正在使用 DataStax Enterprise 中 cassandra 中提供的压力测试。如果有人知道的话,我也想要一些关于它和 cassandra 的信息。 - 首先,压力测试使用哪些节点?我的
当我在 CQL 中创建表时,列的顺序是否必须精确 不是 在主键和 中不是 聚类列: CREATE TABLE user ( a ascii, b ascii, c ascii,
我有一张如下表: CREATE TABLE tab( categoryid text, id text, name text, author text, des
我正在尝试学习 Cassandra,但对术语感到困惑。 很多情况下它表示该行存储键/值对。 但是,当我定义一个表时,它更像是声明一个 SQL 表,即;您创建一个表并指定列名和数据类型。 谁能澄清一下?
如何对 cassandra 数据实现审计? 我正在寻找一个开源选项。 cassandra 是否有任何有助于审计的功能? 我可以使用触发器将记录记录到表中吗?我关注了 Triggers示例并且能够将记录
我遇到了一个问题“me.prettyprint.hector.api.exceptions.HUnavailableException:: 可能没有足够的副本来处理一致性级别。”当我有 RF=1 时,
我是一名优秀的程序员,十分优秀!