gpt4 book ai didi

apache-kafka - 无法发送以 KafkaNull 作为值的消息

转载 作者:行者123 更新时间:2023-12-04 12:44:06 26 4
gpt4 key购买 nike

我正在使用主题上的日志压缩构建 Kafka 应用程序,但我无法发送逻辑删除值 (KafkaNull)

我曾尝试使用序列化程序的默认配置,但当它不起作用时,我使用了“Publish null/tombstone message with raw headers”中建议的更改,将 application.properties 设置为:

spring.cloud.stream.output.producer.useNativeEncoding=true
spring.cloud.stream.kafka.binder.configuration.value.serializer=org.springframework.kafka.support.serializer.JsonSerializer

我必须向流发送消息的代码是

this.stockTopics.compactedStocks().send(MessageBuilder
.withPayload(KafkaNull.INSTANCE)
.setHeader(KafkaHeaders.MESSAGE_KEY,company.getBytes())
.build())

this.stopTopics.compactedStocks() 返回一个我可以向其发送消息的 messageStream。

每次我尝试使用 KafkaNull 实例作为有效载荷发送该消息时,我都会收到错误消息 Failed to convert message: 'GenericMessage [payload=org.springframework.kafka.support.KafkaNull@1c2d8163, headers={ id=f81857e7-fbd0-56f5-8418-6a1944e7f2b1, kafka_messageKey=[B@36ec022a, contentType=application/json, timestamp=1547827957485}]' 到出站消息。

我希望消息简单地以空值发送给消费者,但显然它出错了。

最佳答案

我打开了一个GitHub issue为此。

编辑

解决方法 - 这有效...

@SpringBootApplication
@EnableBinding(Source.class)
public class So54257687Application {

public static void main(String[] args) {
SpringApplication.run(So54257687Application.class, args);
}

@Bean
public ApplicationRunner runner(MessageChannel output) {
return args -> output.send(new GenericMessage<>(KafkaNull.INSTANCE));
}

@KafkaListener(id = "foo", topics = "output")
public void listen(@Payload(required = false) byte[] in) {
System.out.println(in);
}

@Bean
@StreamMessageConverter
public MessageConverter kafkaNullConverter() {
class KafkaNullConverter extends AbstractMessageConverter {

KafkaNullConverter() {
super(Collections.emptyList());
}

@Override
protected boolean supports(Class<?> clazz) {
return KafkaNull.class.equals(clazz);
}

@Override
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
return message.getPayload();
}

@Override
protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
return payload;
}

}
return new KafkaNullConverter();
}

}

关于apache-kafka - 无法发送以 KafkaNull 作为值的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54257687/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com