gpt4 book ai didi

elasticsearch - Spring AMQP 和 Elasticsearch - 聚合消息

转载 作者:行者123 更新时间:2023-12-03 02:05:49 27 4
gpt4 key购买 nike

我们在一些 RabbitMQ 队列上有一个消费者,它读取消息,然后在 Elasticsearch 中索引这些数据。实现是使用 spring-amqp 完成的。为了提高性能,我们计划在消费者级别聚合消息并在 Elasticsearch 中进行批量插入(这确实会提高性能)。

您对如何实现这一点有任何建议吗?此外,另一个敏感问题是如何处理响应。每条消息都有一个“reply_to” header ,我们使用带有回复 channel 的入站网关,因此对于每条消息,都应该传递一个响应。

我正在考虑使用 spring 集成中的聚合器,其发布策略基于批量大小和 MessageGroupStore 过期的时间段(当然还有收割者)。入站网关的任务执行器为 20,假设预取计数为 20。每当请求到来时,消息将被添加到组存储中,当 canRelease() 条件正常时,请求附带的线程之一的 reaper 将执行批量操作。但是我对其他线程所做的事情将不得不等待永远不会到来的响应。另外,我不知道如何打破对大的聚合消息的响应,以便每个小请求都会有响应。

另一个问题,我如何确认消息?从我读到的事务会降低 RabbitMQ 端的性能,所以我对使用“tx-size”属性并不满意。此外,如果超时太小,此属性可能会计算错误。

最佳答案

关于消费者和聚合者问题的答案:

使用来自 AMQP 的消息并聚合的配置。
聚合策略基于 Transction 提交:

<amqp:inbound-channel-adapter queue-names="myQueue"
transaction-manager="transactionManager"
channel-transacted="true"
channel="aggregateChannel"
advice-chain="aggregatorReaperAdvice"
concurrent-consumers="4"
tx-size="100"/>

<aggregator input-channel="aggregateChannel" output-channel="storeChannel"
expire-groups-upon-completion="true"
correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="^[payload.equals(@AGGREGATOR_RELEASE_MARK)] != null"
expression="?[!payload.equals(@AGGREGATOR_RELEASE_MARK)].![payload]"/>
ReaperAdvice (常规代码):
@Service
class AggregatorReaperAdvice implements MethodBeforeAdvice, InitializingBean {

private static final TRANSACTION_RESOURCE_MARK = 'TRANSACTION_RESOURCE_MARK'

public static final AGGREGATOR_RELEASE_MARK = 'AGGREGATOR_RELEASE_MARK'

MessagingTemplate messagingTemplate

@Autowired
MessageChannel aggregateChannel

@Override
void afterPropertiesSet() throws Exception {
Assert.notNull aggregateChannel, "aggregateChannel must not be null"
messagingTemplate = new MessagingTemplate(aggregateChannel)
}

@Override
void before(Method method, Object[] args, Object target) {
if (!TransactionSynchronizationManager.hasResource(AggregatorReaperAdvice)) {
TransactionSynchronizationManager.bindResource(AggregatorReaperAdvice, TRANSACTION_RESOURCE_MARK)
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {

@Override
void beforeCommit(boolean readOnly) {
messagingTemplate.send(MessageBuilder.withPayload(AGGREGATOR_RELEASE_MARK).build())
}

@Override
void afterCompletion(int status) {
TransactionSynchronizationManager.unbindResource(AggregatorReaperAdvice)
}

})
}
}
}

如果不清楚,请告诉我。

所有其他问题,将很快得到解决。

对于 manual ack您可以使用 channel.basicAck(deliveryTag, true); - 至 ack在最后 deliveryTag对于所有以前的消息。

对于 headers["reply_to"]案例...我认为您应该提供自定义 AbstractAggregatingMessageGroupProcessor对于 aggregator并杀死两只鸟:聚合器和迭代的累积结果 MessageGroup.getMessages()将它们中的每一个发送到提供的 MessageChannel 的回复过程.这是您的案例的快速解决方案。

类似但更松耦合的解决方案可能基于聚合器及其 MessageGroupStore的结果。 , 在哪里提取 correlationKey检索组及其消息以执行所需的 reply逻辑。在这种情况下,您不应使用聚合器从存储中删除组,而应在该组检索后手动删除。

关于elasticsearch - Spring AMQP 和 Elasticsearch - 聚合消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26231849/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com