gpt4 book ai didi

Azure 事件中心 : de-mux and aggregate events from all partitions of eventhub using IEventProcessor

转载 作者:行者123 更新时间:2023-12-02 06:48:44 26 4
gpt4 key购买 nike

我有 1 个带有 2 个分区的 eventhub,我想聚合我的数据一分钟并将该数据保存到数据库,我正在使用 IEventProcessor 从 eventhub 读取事件。

我能够将数据按原样保存到数据库,但是当我聚合数据时,我每分钟得到 2 个条目,而不是 1 个。我认为原因是 IEventProcessor 运行两次,即每次针对 eventhub 中的分区。

有什么方法可以在从 eventhub 读取数据时实现一分钟的流数据聚合,然后保存到数据库吗? (我无法使用流分析,因为我有 protobuf 格式的数据。)

最佳答案

您可以使用Azure IoTHub React Java 和 Scala API,它提供了一个包含来自所有 EventHub 分区的事件的合并 react 流。

从您的角度来看,无论 EventHub 中的分区数量有多少,您都只会看到一个数据流,并且如果需要,您也可以选择分区的子集。

These samples展示 API 是如何工作的,它应该会让你的任务变得非常简单。您需要定义“Sink”,它将是将事件写入数据库的方法,并链接提供的“Source”,例如:

val eventHubRecords = IoTHub().source(java.time.Instant.now())

val myDatabase = Sink.foreach[MessageFromDevice] {
m ⇒ MyDB.writeRecord(m)
}

eventHubRecords.to(myDatabase).run()

这里是configuration settings ,检查点支持 Cassandra 和 AzureBlob。

注意:该项目以 Azure IoT 命名,但您可以将其用于 EventHub,如果您有任何问题,请告诉我。

关于Azure 事件中心 : de-mux and aggregate events from all partitions of eventhub using IEventProcessor,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42078355/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com