gpt4 book ai didi

java - Kafka Stream 时间戳提取器在 KGroupedStream 处失败

转载 作者:行者123 更新时间:2023-12-02 04:43:27 25 4
gpt4 key购买 nike

我正在构建一个 Spring Cloud 微服务,它使用来自 kafka 主题的数据。在消费者中,我将主题绑定(bind)到 KStream。由于kafka版本低于0.10,传入消息不包含时间戳。当我解析传入的值时,它工作正常。否则,当我按键对它们进行分组时,它不会使用“default.timestamp.extractor”(已设置为 org.apache.kafka.streams.processor.WallclockTimestampExtractor)。

我已经使用不同版本的kafka(高于或等于0.10)测试了该服务,并且运行良好。

这是我的配置:

Spring : 云: 溪流: 卡夫卡: 流: Binder : 经纪人:${KAFKA_BROKERS} applicationId:电子邮件消息流 配置: default.key.serde:org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde 提交间隔时间:1000 default.timestamp.extractor:org.apache.kafka.streams.processor.WallclockTimestampExtractor poll.ms: 60000 # 等待更多消息的阻塞时间 buffered.records.per.partition: 2000

我的代码的某些部分:

    stream
.mapValues(this::mapMessage)
.groupBy(this::buildGroup, Serialized.with(new JsonSerde<>(Group.class), new JsonSerde<>(EmailMessage.class)))
.windowedBy(TimeWindows.of(WINDOW_TIME))
.aggregate(ArrayList::new, this::aggregate, Materialized.with(new JsonSerde<>(Group.class), new MessageListSerialization()))
.toStream()
.process(() -> new MailMessagesProcessor(emailService));

它向我抛出此错误:org.apache.kafka.streams.errors.StreamsException:输入记录ConsumerRecord(topic = .....)使用不同的TimestampExtractor来处理此数据。

最佳答案

Kafka Streams 需要代理 0.10.0 或更高版本。它与旧版经纪商不兼容。

  • Kafka Streams 0.10.0 仅与 0.10.0(或更高版本)代理兼容。

  • Kafka Streams 0.10.1 及更高版本向后兼容 0.10.1(但不兼容较旧的代理)并兼容较新的代理。

  • 此外,从 Kafka Streams 1.0 开始,需要消息格式 0.10(或更高)。因此,即使您将代理升级到 0.10.0(或更高版本),如果您的消息格式也没有升级,它也将无法工作。

  • 要使用“exactly-once”功能,需要 0.11.0(或更高版本)的代理版本。

更多详情请参阅:https://docs.confluent.io/current/streams/upgrade-guide.html#compatibility

关于java - Kafka Stream 时间戳提取器在 KGroupedStream 处失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56501567/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com