gpt4 book ai didi

spring - 使用spring cloud stream Kinesis时如何设置PartitionKey

转载 作者:行者123 更新时间:2023-12-04 16:04:43 24 4
gpt4 key购买 nike

在我的代码中,我使用了setHeader

mysource.getChannel1()
.send(MessageBuilder
.withPayload(new Person("messageA", 1))
.setHeader("partitionKey", 345).build());

在我添加的属性文件中:

spring.cloud.stream.bindings.channel1.producer.partitionKeyExpression = 
headers['partitionKey']

但是,PartitionKey 仍然不是 345 partitionKey 是一些哈希值 2133325211。即使我插入 2 条具有相同 partitionKey header 的消息,在 Kinesis 中我们也会得到 2 个不同的分区键。

当我尝试

spring.cloud.stream.bindings.output.producer.partitionKeyExpression = payload.id

partitionKey 总是等于partitionKey-0

我的问题:

如何将分区键设置为特定值?

最佳答案

当前实现依赖于 BinderHeaders.PARTITION_HEADER 的 SCSt 内置算法的问题,它会产生一个 partition number,这不适合根据 Kinesis 的性质,我们应该如何选择特定的分片。好吧,实际上我们根本不选择它。我们提供一些分区键值,让消息通过该值的哈希值在同一个分片中安顿下来。或者我们可以提供一个明确的散列来确保我们去同一个分片。本质上它最终是相同的 - 我们最终通过哈希得到分片。

为了使其适用于您的 payload.id 用例,我建议查看 BinderHeaders.PARTITION_OVERRIDE header 方法:

@Bean
@GlobalChannelInterceptor(order = Integer.MIN_VALUE, patterns = Source.OUTPUT)
public ChannelInterceptor partitionOverrideInterceptor(BindingProperties bindingProperties,
StandardEvaluationContext evaluationContext) {

return new ChannelInterceptorAdapter() {

@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return MessageBuilder.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER,
bindingProperties.getProducer()
.getPartitionKeyExpression()
.getValue(evaluationContext, message))
.build();
}

};
}

这样,scst_partition header 将具有您希望通过 partitionKeyExpression 提供的准确值,并且 KinesisMessageHandler 将具有正确的目标值在 PutRecordRequest 中散列。

参见 https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/issues/52了解更多信息。

关于spring - 使用spring cloud stream Kinesis时如何设置PartitionKey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49253112/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com