gpt4 book ai didi

mongodb - Kafka -> Flink 数据流 -> MongoDB

转载 作者:可可西里 更新时间:2023-11-01 14:12:51 25 4
gpt4 key购买 nike

我想设置 Flink,以便它将数据流从 Apache Kafka 转换并重定向到 MongoDB。出于测试目的,我在 flink-streaming-connectors.kafka 示例 ( https://github.com/apache/flink ) 之上构建。

Kafka 流被 Flink 正确地标记为红色,我可以映射它们等,但是当我想将每条收到和转换的消息保存到 MongoDB 时,问题就出现了。我发现的关于 MongoDB 集成的唯一示例是来自 github 的 flink-mongodb-test。不幸的是,它使用静态数据源(数据库),而不是数据流。

我相信 MongoDB 应该有一些 DataStream.addSink 实现,但显然没有。

实现它的最佳方法是什么?我是否需要编写自定义接收器功能,或者我可能遗漏了什么?也许应该以不同的方式完成?

我不依赖于任何解决方案,因此我们将不胜感激。

下面是一个示例,说明我究竟得到了什么作为输入以及我需要存储什么作为输出。

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>

Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection

正如你在这个例子中看到的,我主要使用 Flink 来进行 Kafka 的消息流缓冲和一些基本的解析。

最佳答案

作为 Robert Metzger 回答的替代方法,您可以将结果再次写入 Kafka,然后使用维护的 Kafka 连接器之一将主题内容放入 MongoDB 数据库中。

Kafka -> Flink -> Kafka -> Mongo/Anything

通过这种方法,您可以保持“至少一次语义”行为。

关于mongodb - Kafka -> Flink 数据流 -> MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35158683/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com