gpt4 book ai didi

apache-kafka - 发送到 kafka 主题时序列化消息时出错

转载 作者:行者123 更新时间:2023-12-04 05:25:43 29 4
gpt4 key购买 nike

我需要测试包含标题的消息,所以我需要使用 MessageBuilder,但我无法序列化。

我尝试在生产者 Prop 上添加序列化设置,但没有奏效。

有人能帮我吗?

这个错误:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

我的测试类:
public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
+ "\"cardId\" : \"11\","
+ "\"transactionId\" : \"20110405123456\","
+ "\"amount\" : 200.59,"
+ "\"partnerId\" : \"11\"}";

Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, Message<String>> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, Message<String>> ("notification_topic", MessageBuilder.withPayload(payload)
.setHeader("status", "RECEIVED")
.setHeader("service", "MASTERCARD")
.build()));

Map<String, Object> configs = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);

configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);

Consumer<byte[], byte[]> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("transaction_topic"));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
consumer.commitSync();

assertThat(records.count()).isEqualTo(1);
}

}

最佳答案

我会说错误很明显:

Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

您的值(value)在哪里 GenericMessage ,但是 StringSerializer只能处理字符串。

您需要的叫 JavaSerializer它不存在,但写起来并不难:
public class JavaSerializer implements Serializer<Object> {

@Override
public byte[] serialize(String topic, Object data) {
try {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
objectStream.writeObject(data);
objectStream.flush();
objectStream.close();
return byteStream.toByteArray();
}
catch (IOException e) {
throw new IllegalStateException("Can't serialize object: " + data, e);
}
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {

}

@Override
public void close() {

}

}

并为此进行配置 value.serializer属性(property)。

关于apache-kafka - 发送到 kafka 主题时序列化消息时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43612072/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com