an-6ren">
gpt4 book ai didi

java - Kafka Stream 如何使用 KTable#Suppress 发送最终聚合?

转载 作者:行者123 更新时间:2023-12-02 10:24:55 24 4
gpt4 key购买 nike

我想做的是:

  1. 使用主题中的记录
  2. 计算每个 1 秒窗口的值
  3. 检测记录数 < 4 的窗口
  4. 将最终结果发送到另一个主题

我使用suppress发送最终结果,但出现了这样的错误。

09:18:07,963 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager  
- task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
.....

我认为我的代码与开发人员指南中的示例相同。有什么问题?我的代码在这里。

final KStream<String, String> views = builder.stream("fluent-newData");
final KTable<Windowed<String>, Long> anomalousUsers = views
.map((key, value) -> {
JSONObject message = JSONObject.fromObject(value);
String[] strArry = message.getString("detail").split(",");
return KeyValue.pair(strArry[0], value);
})
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(1))
.grace(Duration.ofSeconds(20)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 4);

final KStream<String, String> anomalousUsersForConsole = anomalousUsers
.toStream()
.filter((windowedUserId, count) -> count != null)
.map((windowedUserId, count) -> new KeyValue<>(windowedUserId.toString(), windowedUserId.toString() +" c:" + count.toString()));

anomalousUsersForConsole.to("demo-count-output", Produced.with(stringSerde, stringSerde));

最佳答案

“Windowed 无法转换为 java.lang.String” 通常在未直接指定 serdes 时抛出。

当您构建stream(..)时,直接指定Consumed实例,如下所示:

builder.stream("fluent-newData", Consumed.with(Serdes.String(), Serdes.String()))

同样对于groupByKey(),您需要传递Grouped,如下所示:

 .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))

关于java - Kafka Stream 如何使用 KTable#Suppress 发送最终聚合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54036328/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com