gpt4 book ai didi

azure - 使用 AMQP 建立与事件中心的连接时如何设置 x-opt-offset 以避免消息重播

转载 作者:行者123 更新时间:2023-12-03 01:38:16 31 4
gpt4 key购买 nike

我的应用程序连接到 azure 事件中心以接收消息并处理它们。我发现每次重新启动应用程序时,保留期内的所有消息都会重播。我阅读了有关 offset 的内容以避免此问题,并且我有一种方法可以将与 azure 事件中心的连接设置为:

    MessageConsumer connect() {
// set up JNDI context
BatchEventHubConfig batchEventHubConfig = //MAP CONTAINING CONFIG
String queueName = "EventHub"
String connectionFactoryName = "SBCF"
//Long offset = batchAccountManager.batchStorageManager.batchJobMsgCheckpointService.get(batchEventHubConfig.namespace, batchEventHubConfig.getMessageQueueAddress(partitionInx, true))?.offset
Hashtable<String, String> hashtable = new Hashtable<>()
hashtable.put("connectionfactory.${connectionFactoryName}", batchEventHubConfig.getAMQPConnectionURI())
hashtable.put("queue.${queueName}", batchEventHubConfig.getMessageQueueAddress(partitionInx))
//hashtable.put("apache.org:selector-filter:string", "amqp.annotation.x-opt-offset > '${offset}'")
hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.amqp_1_0.jms.jndi.PropertiesFileInitialContextFactory")
Context context = new InitialContext(hashtable)

ConnectionFactory factory = (ConnectionFactory) context.lookup(connectionFactoryName)
queue = (Destination) context.lookup(queueName)
connection = factory.createConnection(batchEventHubConfig.sasPolicyName, batchEventHubConfig.sasPolicyKey)
connection.setExceptionListener(new BatchExceptionListener(eventHubConnection: this))

connection.start()
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
messageConsumer = session.createConsumer(queue)
messageConsumer.setMessageListener(messageListener)
messageConsumer
}

注释掉的偏移代码是我在阅读完此处后尝试的:https://azure.github.io/amqpnetlite/articles/azure_eventhubs.html

设置偏移量以便应用程序重新启动时不会重新播放消息的正确方法是什么?

最佳答案

Apche QPID 不支持 AMQP 过滤器(底层 Apache Proton J 支持)..

我通过在末尾添加以下行来修补 AmqpConsumerBuilder.configureSource() 方法:

Symbol filterKey = Symbol.valueOf("apache.org:selector-filter:string");
UnknownDescribedType filterValue = new UnknownDescribedType(filterKey, String.format("%s > '%s'",amqp.annotation.x-opt-offset", lastOffset));

filters.put(filterKey, filterValue);

它有效!

因此,要么创建 Apache QPID 的分支并应用此补丁,要么将修改后的类放入类路径中以覆盖原始类(非常糟糕的解决方案)

关于azure - 使用 AMQP 建立与事件中心的连接时如何设置 x-opt-offset 以避免消息重播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56194397/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com