gpt4 book ai didi

java - 如何在 Apache Flink 中平面映射到数据库?

转载 作者:行者123 更新时间:2023-11-30 05:20:26 25 4
gpt4 key购买 nike

我正在使用 Apache Flink 尝试将 JSON 记录从 Kafka 获取到 InfluxDB,并在此过程中将它们从一个 JSON 记录拆分为多个 InfluxDB 点。

我找到了 flatMap 转换,感觉它符合目的。核心代码如下:

DataStream<InfluxDBPoint> dataStream = stream.flatMap(new FlatMapFunction<JsonConsumerRecord, InfluxDBPoint>() {
@Override
public void flatMap(JsonConsumerRecord record, Collector<InfluxDBPoint> out) throws Exception {
Iterator<Entry<String, JsonNode>> iterator = //...

while (iterator.hasNext()) {
// extract point from input
InfluxDBPoint point = //...

out.collect(point);
}
}
});

由于某种原因,我只将收集到的点之一流入数据库。

即使当我打印出所有映射条目时,它似乎也工作得很好:dataStream.print() 产量:

org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@144fd091
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@57256d1
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@28c38504
org.apache.flink.streaming.connectors.influxdb.InfluxDBPoint@2d3a66b3

我是否误解了 flatMap 或者 Influx 连接器中可能存在一些错误?

最佳答案

这个问题实际上与 Influx 中的一系列(由其标签集和度量定义 as seen here 定义)每次只能有一个点有关,因此即使我的字段不同,最终点覆盖了具有相同时间值的所有先前点。

关于java - 如何在 Apache Flink 中平面映射到数据库?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59678807/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com