- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
是否有可能获得window final result在 Kafka Streams 中通过抑制中间结果。
我无法实现这个目标。我的代码有什么问题?
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())) // not working)
.toStream()
.print(Printed.toSysOut())
Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000001:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
最佳答案
问题是 Streams 在窗口期间自动包装显式 serde,但不自动包装默认 serde 的方式存在令人困惑的不对称性。恕我直言,这是一个应该纠正的疏忽,所以我提交了:https://issues.apache.org/jira/browse/KAFKA-7806
正如其他人所指出的,解决方案是在上游显式设置 key serde,而不依赖于默认 key serde。您可以:
使用 Materialized
在窗口聚合上设置 serdes
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
count
操作,它具有使其可查询的副作用)
val builder = StreamsBuilder()
builder.stream<String,Double>(inputTopic, Consumed.with(Serdes.String(), Serdes.Double()))
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded())))
.toStream()
.print(Printed.toSysOut())
count
不同的聚合,您可能会通过
Materialized
设置 serde 值无论如何,所以也许前者会是一种更统一的风格。
window end + grace period
,默认值为 24 小时,因此在应用程序运行 24 小时的数据之前,您不会看到抑制发出的任何内容。
.windowedBy(TimeWindows.of(Duration.ofSeconds(15)).grace(Duration.ZERO))
count
在将更新传递给抑制之前,运算符(operator)本身将在默认 30 秒内缓冲更新。这是一个很好的生产配置,因此您不会对本地磁盘或 Kafka 代理造成瓶颈。但是在您进行测试时,它可能会让您感到惊讶。
properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
关于kotlin - KafkaStreams : Getting Window Final Results,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54110206/
我正在开发 Kafka 流应用程序,但在弄清楚如何使聚合工作时遇到了一些麻烦。 我有一个 KStream bankTransactions其中键的类型为 String和类型 JsonNode 的值所以
我一直在研究 Kafka Streams 应用程序上的磁盘写入,并将拓扑减少到最低限度,即: KStream stream = builder.stream("input-topic"); 然而,在
我正在研究将现有 Flink 应用程序/拓扑转换为使用 KafkaStreams 的 POC。我的问题是关于部署。 具体来说 - 在 Flink 中,将“工作节点”添加到 flink 安装中,然后向拓
我正在玩 Kafka 和流技术;我已经为 KStream 创建了一个自定义序列化器和反序列化器,我将使用它来接收来自给定主题的消息。 现在,问题是我正在以这种方式创建一个 serde: JsonSer
我有一个 Kafka 消费者类,它监听事件并执行事件(订单、客户)之间的连接并将它们存储在物化 View 中。当收到 REST 调用时,我创建了另一个类来访问状态存储。但我越来越java.lang.I
我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示: KStreamBuilder builder = new KSt
我是 kafkaStream 的新手,我正在开发一个 Stream,但是当我启动我的应用程序时,很多日志都在记录。 例如,如何将日志级别从 Debbug 更改为 Info。 谢谢你。 6:54:12.
下面是我的代码片段。我想并行kafka流处理。但我不想放入 Runnable,也不想多次启动这个应用程序。 有没有类似streams.parallel()的方法? final
我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。 即 "1-new" , "1-old" , "2-new" , "2-old" .键是唯一的,但有些可能会丢失。 现在使
所以我试图让交互式查询与 Kafka 流一起工作。我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上)。我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和
我已经设置了一个简单的聚合,将来自多个流的值平均在一起,我正在尝试对其进行测试。我已经花了很多时间,但似乎无法直接理解这些概念。我的流如下: // Combine multiple streams t
在 Kafka(0.11.0.1) Streams 中,演示应用程序 Play with a Streams Application // Serializers/deserializers (ser
我正在尝试删除下游变更日志中值为 null 的记录,我知道在状态存储中它们只是通过为 null(逻辑删除)而被删除,但是当您对 KTable 或 Stream 进行聚合时,它们会跳过null 并且不删
是否有可能获得window final result在 Kafka Streams 中通过抑制中间结果。 我无法实现这个目标。我的代码有什么问题? val builder = StreamsB
来自https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStream ,我可以看到有一种简单的方法来运行运行
我正在为大容量高速分布式应用程序编写 Kafka Consumer。我只有一个主题,但收到的消息率非常高。为更多消费者提供服务的多个分区将适合此用例。最好的消费方式是拥有多个流阅读器。根据文档或可用示
我有四个使用相同应用程序 ID 运行的 Kafka 流应用程序实例。所有输入主题都属于单个分区。为了实现可扩展性,我通过了一个具有多个分区的中间虚拟主题。我已经设置了request.timeout.m
我尝试在 Kafka (0.11) 的聚合函数中使用 SessionWindows,但无法理解为什么会出现错误。 这是我的代码片段: // defining some values: public s
我用来创建 GlobalKTable 的话题非常活跃。在 KStream-GlobalKTable join 的文档中我读了 The GlobalKTable is fully bootstrappe
我有一个由 KafkaStreams Java api 编写的 Kafka 应用程序。它从 Mysql binlog 读取数据并执行一些与我的问题无关的操作。问题是某一特定行在 avro 反序列化过程
我是一名优秀的程序员,十分优秀!