gpt4 book ai didi

java - Spring Cloud Stream Kafka > 使用来自 Confluent REST 代理的 Avro 消息

转载 作者:行者123 更新时间:2023-12-01 05:09:26 25 4
gpt4 key购买 nike

我有以下场景:

我的应用程序如下所示:

@SpringBootApplication
@EnableBinding(Sink.class)
public class MyApplication {
private static Logger log = LoggerFactory.getLogger(MyApplication.class);

public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}

@StreamListener(Sink.INPUT)
public void myMessageSink(MyMessage message) {
log.info("Received new message: {}", message);
}
}

而 MyMessage 是 Avro 从 Avro 模式创建的类。

我的 application.properties 看起来像这样:

spring.cloud.stream.bindings.input.destination=myTopic
spring.cloud.stream.bindings.input.group=${spring.application.name}
spring.cloud.stream.bindings.input.contentType=application/*+avro

我现在的问题是每次收到新消息时,都会抛出以下异常:

org.springframework.messaging.MessagingException: Exception thrown while invoking MyApplication#myMessageSink[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:316) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
...
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1]
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.0.RELEASE.jar:1.1.0.RELEASE]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:307) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
... 35 common frames omitted

据我了解,问题在于 Confluent 堆栈将消息架构的 ID 作为消息负载的一部分包含在内,并且客户端应该在架构 ID 之后开始读取实际的 Avro 消息。看来我需要配置 Kafka 绑定(bind)以使用 Confluent 的 KafkaAvroDeserializer,但我不知道如何实现这一点。

(我可以使用 Confluent 的 avro 控制台消费者完美地检索消息,所以它似乎不是 Avro 编码的问题)

这甚至应该以某种方式起作用吗?

最佳答案

设置 per-binding 属性 spring.cloud.stream.kafka.bindings.input.consumer.configuration.value.deserializer 是否有效 >Confluent的KafkaAvroDeserializer类名?

关于java - Spring Cloud Stream Kafka > 使用来自 Confluent REST 代理的 Avro 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39941497/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com