gpt4 book ai didi

java - azure 事件中心 - 仅处理新消息

转载 作者:行者123 更新时间:2023-12-03 02:11:56 28 4
gpt4 key购买 nike

在 Java 中拥有简单的事件中心客户端(只有 1 个分区)

public static void main(String[] args) throws Exception {
// Create a blob container client that you use later to build an event processor client to receive and process events
BlobContainerAsyncClient blobContainerAsyncClient = new BlobContainerClientBuilder()
.connectionString(storageConnectionString)
.containerName(storageContainerName)
.buildAsyncClient();

// Create a builder object that you will use later to build an event processor client to receive and process events and errors.
EventProcessorClientBuilder eventProcessorClientBuilder = new EventProcessorClientBuilder()
.connectionString(connectionString, eventHubName)
.consumerGroup("$default")
.processEvent(PARTITION_PROCESSOR)
.processError(ERROR_HANDLER)//.checkpointStore(new SampleCheckpointStore());
.checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient));

// Use the builder object to create an event processor client
EventProcessorClient eventProcessorClient = eventProcessorClientBuilder.buildEventProcessorClient();

System.out.println("Starting event processor");
eventProcessorClient.start();

System.out.println("Press enter to stop.");
System.in.read();

System.out.println("Stopping event processor");
eventProcessorClient.stop();
System.out.println("Event processor stopped.");

System.out.println("Exiting process");
}

如果我先运行客户端然后发送消息,它会按预期工作。消息已处理。

如果我停止客户端,然后将消息发送到事件中心,然后启动客户端,之前发送的消息是根本没有处理。处理之后发送的消息。为什么?

如果我停止客户端,然后删除 Azure Blob 存储中的检查点数据,然后启动客户端,则不会处理事件中心中的现有消息。处理之后发送的消息。为什么?

使用的库:

        <dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs</artifactId>
<version>5.12.2</version>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-messaging-eventhubs-checkpointstore-blob</artifactId>
<version>1.13.0</version>
</dependency>

我试过了

  • 将库版本更改为 5.10,但没有帮助
  • 使用不同的检查点存储(我使用内存一个),但没有变化完全没有

最佳答案

在其默认配置中,Java 处理器将每个分区的读取器定位在 eventPosition.latest()当没有找到检查点时,这意味着它将只读取处理器启动后发布的事件。 (全套定位逻辑可见here)

构建处理器时,initialPartitionEventPosition可以提供map来为每个分区指定不同的起始位置。

关于java - azure 事件中心 - 仅处理新消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73086302/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com