gpt4 book ai didi

apache-kafka - toStream() 不适用于窗口化 KTable

转载 作者:行者123 更新时间:2023-12-02 08:01:36 27 4
gpt4 key购买 nike

我想编写一个小型 Kafka Streams 应用程序,它会减少输入流的时间窗口,对值进行一些映射,然后将生成的 toStream() 更改日志发送到另一个主题。使用我的代码,我在 toStream() 操作中收到以下错误:

Compilation failure
[ERROR] StreamFilter.java:[39,86] incompatible types: org.apache.kafka.streams.kstream.KStream<org.apache.kafka.streams.kstream.Windowed<java.lang.Integer>,filterExample.SensorMessage> cannot be converted to org.apache.kafka.streams.kstream.KStream<java.lang.Integer,filterExample.SensorMessage>

我在某处读到默认 Serdes 可能是问题所在,但到目前为止,使用 Consumed.with 明确包含它们并没有解决问题。

public static void runStreamFilter(String broker) throws Exception {
final SensorMessageSerializer serializer = new SensorMessageSerializer();
final SensorMessageDeserializer deserializer = new SensorMessageDeserializer();
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-filter");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(serializer, deserializer));

final StreamsBuilder builder = new StreamsBuilder();

final KStream<Integer, SensorMessage> input = builder.stream(KafkaConstants.TOPIC_IN, Consumed.with(Serdes.Integer(), Serdes.serdeFrom(serializer, deserializer)));
final KStream<Integer, SensorMessage> output = input
.filter((k,v) -> v.getValue() > 19)
.groupByKey(Grouped.with(Serdes.Integer(), Serdes.serdeFrom(serializer, deserializer)))
.windowedBy(TimeWindows.of(Duration.ofMillis(500)))
.reduce((aggValue, newValue) -> avgReducer(aggValue, newValue))
.mapValues(value -> latencyMapper(value))
.toStream();


output.to(KafkaConstants.TOPIC_OUT);

最佳答案

代码 .windowedBy(TimeWindows.of(Duration.ofMillis(500))).reduce(..)返回 KTable<Windowed<K>, V> .

为了将您的结果转换为 KStream<Integer, SensorMessage> ,您需要从 Windowed 中提取值对象,所以需要在toStream()之后添加如下代码:

.map((key, value) -> KeyValue.pair(key.key(), value));

关于apache-kafka - toStream() 不适用于窗口化 KTable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56595340/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com