gpt4 book ai didi

java - Spring-cloud-stream MessageConversionException

转载 作者:行者123 更新时间:2023-11-30 01:45:36 25 4
gpt4 key购买 nike

我有一个接受 @Payload 字符串的 @StreamListener。为了测试这个 Listener 类,我使用嵌入式 Kafka 编写了一个 Junit 类。我在运行测试类时遇到以下错误

错误

ERROR o.s.i.handler.LoggingHandler -org.springframework.messaging.converter.MessageConversionException:Cannot convert from [[B] to [java.lang.String] for GenericMessage

<小时/>

如果我将 @Payload 的数据类型从 String 更改为 byte[] 消息将被我的监听器类选取。

有人可以帮我知道这里出了什么问题吗?我猜这与云流配置有关。

<小时/>
@ExtendWith(SpringExtension.class)
@DirtiesContext
@SpringBootTest(classes = IntegrationTestConsumer.class)
@EmbeddedKafka(partitions = 1, controlledShutdown = true,
topics = {
"input",
"output"})
public class TestUtils {

public static final String KEY_SERIALIZER = "key.serializer";
public static final String VALUE_SERIALIZER = "value.serializer";

@Autowired
private EmbeddedKafkaBroker embeddedKafka;

@BeforeEach
public void setup() {
System.setProperty("spring.cloud.stream.kafka.binder.brokers", embeddedKafka.getBrokersAsString());
}

@Test
public void someTest() throws Exception {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
senderProps.put(KEY_SERIALIZER, StringSerializer.class);
senderProps.put(VALUE_SERIALIZER, StringSerializer.class);
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory, true);
template.setDefaultTopic("input");
template.sendDefault("foo");

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"input_group",
"false",
this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put("key.deserializer", StringDeserializer.class);
consumerProps.put("value.deserializer", StringDeserializer.class);
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);

Consumer<String, String> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("output"));
ConsumerRecords<String, String> records = consumer.poll(10_000);
consumer.commitSync();
Assertions.assertThat(records.count()).isGreaterThanOrEqualTo(1);
<小时/>这是我的 application.yaml 的样子。<小时/>
spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
bindings:
input:
consumer:
enable-dlq: true
dlq-name: output
dlq-producer-properties:
retries: 1
binder:
brokers: ${spring.embedded.kafka.brokers}
replicationFactor: ${replication_factor:1}
autoCreateTopics: true
autoAddPartitions: true
configuration:
retries: 1
batch.size: 16384
linger.ms: 1
enable.idempotence: true
buffer.memory: 33554432
request.timeout.ms: 3000
transaction.timeout.ms: 3000
max.block.ms: ${kafka_max_block_time:5000}
max.poll.records: 80
poll.timeout: 10000
commit.retries: 1
commit.retry.interval: 1000
session.timeout.ms.config: 50000
shutdown.signal: INT,TERM
acks: "all"
bindings:
output:
destination: output
contentType: application/json
producer:
partitionCount: ${partition_count:1}
input:
destination: input
contentType: application/json
partitioned: true
group: input_group
<小时/>

最佳答案

请检查您是否模拟了 ObjectMapper,因为 ObjectMapper 无法将 byte[] 转换为 String。

关于java - Spring-cloud-stream MessageConversionException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58130013/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com