gpt4 book ai didi

java - 如何使用Spring Cloud Stream实现自定义kafka分区

转载 作者:行者123 更新时间:2023-11-30 05:35:42 26 4
gpt4 key购买 nike

我正在尝试使用 Spring Cloud 流绑定(bind)来实现自定义 Kafka 分区器。我只想对用户主题进行自定义分区,而不对公司主题执行任何操作(在这种情况下,Kafka 将使用 DefaultPartitioner)。

我的绑定(bind)配置:

spring:
cloud:
stream:
bindings:
comp-out:
destination: company
contentType: application/json
user-out:
destination: user
contentType: application/json

根据引用文档:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/2.1.0.RC4/single/spring-cloud-stream-binder-kafka.html#_partitioning_with_the_kafka_binder我将配置修改为:

spring:
cloud:
stream:
bindings:
comp-out:
destination: company
contentType: application/json
user-out:
destination: user
contentType: application/json
producer:
partitioned: true
partitionSelectorClass: config.UserPartitioner

我使用以下命令将消息发布到 Stream 中:

public void postUserStream(User user) throws ServiceException {
try {
LOG.info("Posting User {} into Kafka stream...", user);
MessageChannel messageChannel = messageStreams.outboundUser();
messageChannel
.send(MessageBuilder.withPayload(user)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());
} catch (Exception ex) {
LOG.error("Error while populating User stream into Kafka.. ", ex);
throw ex;
}
}

我的 UserPartitioner 类:

public class UserPartitioner extends DefaultPartitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
Cluster cluster) {

String partitionKey = null;
if (Objects.nonNull(value)) {
User user = (User) value;
partitionKey = String.valueOf(user.getCompanyId()) + "_" + String.valueOf(user.getId());
keyBytes = partitionKey.getBytes();
}
return super.partition(topic, partitionKey, keyBytes, value, valueBytes, cluster);
}
}

我最终收到以下异常:

描述:无法将“spring.cloud.stream.bindings.user-out. Producer”下的属性绑定(bind)到org.springframework.cloud.stream.binder.ProducerProperties:

Property: spring.cloud.stream.bindings.user-out.producer.partitioned
Value: true
Origin: "spring.cloud.stream.bindings.user-out.producer.partitioned" from property source "bootstrapProperties"
Reason: No setter found for property: partitioned

行动:更新应用程序的配置

有关如何使用消息绑定(bind)程序设置自定义分区的任何引用链接都会有所帮助。

编辑:根据文档也尝试了以下步骤:

用户输出: 目的地:用户 内容类型:应用程序/json 制片人: partitionKeyExtractorClass:config.SimpleUserPartitioner

@Component
public class SimpleUserPartitioner implements PartitionKeyExtractorStrategy {

@Override
public Object extractKey(Message<?> message) {
if(message.getPayload() instanceof BaseUser) {
BaseUser user = (BaseUser) message.getPayload();
return user.getId();
}
return 10;
}

}

更新2:对我有用的解决方案将partitioncount添加到绑定(bind)中,并在 Binder 中将autoaddpartitions设置为true:

spring:
logging:
level: info
cloud:
stream:
bindings:
user-out:
destination: user
contentType: application/json
producer:
partition-key-expression: headers['partitionKey']
partition-count: 4

spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
autoAddPartitions: true

最佳答案

没有属性已分区; getter 取决于其他属性...

public boolean isPartitioned() {
return this.partitionKeyExpression != null
|| this.partitionKeyExtractorName != null;
}

partitionSelectorClass: config.UserPartitioner

UserPartitioner 是一个 Kafka Partitioner - 它决定哪些消费者获得哪些分区(在消费者端)

partitionSelectorClass 必须是 PartitionSelectorStrategy - 它确定记录发送到哪个分区(在生产者端)。

这些是完全不同的对象。

如果您确实想要自定义分区在消费者实例之间的分布方式,这是 Kafka 关心的问题,与 Spring 无关。

此外,同一绑定(bind)器中的所有消费者绑定(bind)都将使用相同的Partitioner。您必须配置多个 Binder 以具有不同的 Partitioner

鉴于您的问题,我认为您只是将 PartitionerPartitionSelectorStrategy 混淆了,而您需要后者。

关于java - 如何使用Spring Cloud Stream实现自定义kafka分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56675828/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com