gpt4 book ai didi

java - MongoDb 实时(或接近实时)流出插入的数据

转载 作者:IT老高 更新时间:2023-10-28 13:29:19 25 4
gpt4 key购买 nike

我有许多 MongoDB 集合,它们从各种流式源中获取许多 JSON 文档。换句话说,有许多进程不断地将数据插入到一组 MongoDB 集合中。

我需要一种将数据从 MongoDB 流式传输到下游应用程序的方法。所以我想要一个概念上看起来像这样的系统:

App Stream1 --> 
App Stream2 --> MONGODB ---> Aggregated Stream
App Stream3 -->

或者这个:

App Stream1 -->                 --->  MongoD Stream1
App Stream2 --> MONGODB ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3

问题是我如何从 Mongo 中流式传输数据而无需不断地轮询/查询数据库?

显而易见的问题答案是“为什么不更改这些应用程序流式处理以将消息发送到 Rabbit、Zero 或 ActiveMQ 之类的队列,然后让它们像这样立即发送到您的 Mongo 流式处理和 Mongo”:

                 MONGODB
/|\
|
App Stream1 --> | ---> MongoD Stream1
App Stream2 --> SomeMQqueue ---> MongoD Stream2
App Stream3 --> ---> MongoD Stream3

在理想的世界中是的,这很好,但是我们需要 Mongo 来确保首先保存消息,避免重复并确保生成所有 ID 等。Mongo 必须位于中间作为持久层。

那么我如何将消息从 Mongo 集合(不使用 GridFS 等)流式传输到这些下游应用程序中。基本思想流派只是轮询新文档,收集到的每个文档通过向存储在数据库中的 JSON 文档添加另一个字段来更新它,就像 SQL 表中存储已处理时间戳的进程标志一样。 IE。每 1 秒轮询已处理的文档 == null.... 添加已处理 = now().... 更新文档。

有没有更简洁/计算效率更高的方法?

仅供引用 - 这些都是 Java 进程。

干杯!

最佳答案

如果您正在写入一个有上限的集合(或多个集合),您可以使用 tailablecursor将新数据推送到流上,或者推送到可以从中流出的消息队列中。但是,这不适用于无上限的集合。

关于java - MongoDb 实时(或接近实时)流出插入的数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7170384/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com