gpt4 book ai didi

apache-kafka - 卡夫卡流 : Custom TimestampExtractor for aggregation

转载 作者:行者123 更新时间:2023-12-04 18:57:39 24 4
gpt4 key购买 nike

我正在构建一个非常简单的 KafkaStreams 演示应用程序,以测试用例。

我无法升级我正在使用的 Kafka 代理(当前版本为 0.10.0),并且有几条消息是由 0.10.0 之前的生产者编写的,所以我使用的是自定义 TimestampExtractor,我将其添加为我的主类开头的默认配置:

config.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, GenericRecordTimestampExtractor.class);

从我的源主题中使用时,这非常有效。但是在使用聚合运算符时,我遇到了异常 因为FailOnInvalidTimestamp TimestampExtractor 的实现从内部聚合主题 消费时,使用 代替自定义实现.

Streams 应用程序的代码如下所示:
...

KStream<String, MyValueClass> clickStream = streamsBuilder
.stream("mytopic", Consumed.with(Serdes.String(), valueClassSerde));

KTable<Windowed<Long>, Long> clicksByCustomerId = clickStream
.map(((key, value) -> new KeyValue<>(value.getId(), value)))
.groupByKey(Serialized.with(Serdes.Long(), valueClassSerde))
.windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(1)))
.count();
...

我遇到的异常如下:
    Exception in thread "click-aggregator-b9d77f2e-0263-4fa3-bec4-e48d4d6602ab-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: 
Input record ConsumerRecord(topic = click-aggregator-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition, partition = 9, offset = 0, CreateTime = -1, serialized key size = 8, serialized value size = 652, headers = RecordHeaders(headers = [], isReadOnly = false), key = 11230, value = org.example.MyValueClass@2a3f2ea2) has invalid (negative) timestamp.
Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.

现在的问题是:有什么办法可以让 Kafka Streams 使用自定义 TimestampExtractor从内部聚合主题中读取时(最好在仍然使用 Streams DSL 时)?

最佳答案

您不能更改时间戳提取器(从 v1.0.0 开始)。出于正确性原因,这是不允许的。

但我真的很想知道,一个时间戳为 -1 的记录是如何写入这个主题的。 Kafka Streams 使用您的自定义提取器在写入记录时提供的时间戳。另请注意,KafkaProducer不允许写入带有负时间戳的记录。

因此,我能想到的唯一解释是其他一些生产者确实写入了重新分区主题——这是不允许的......只有 Kafka Streams 应该写入重新分区主题。

我想,您需要删除此主题并让 Kafka Streams 重新创建它以恢复到干净的状态。

来自另一个答案的讨论/评论:

You need 0.10+ format to work with Kafka Streams. If you upgrade your brokers and keep 0.9 format or older, Kafka Streams might not work as expected.

关于apache-kafka - 卡夫卡流 : Custom TimestampExtractor for aggregation,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48427964/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com