- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。
即 "1-new"
, "1-old"
, "2-new"
, "2-old"
.键是唯一的,但有些可能会丢失。
现在使用 Kotlin 和 KafkaStreams API,我可以记录那些具有相同 key ID 的新旧消息。
val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())
val newStream = stream.filter({ key, _ -> isNew(key) })
.map({key, value -> KeyValue(key.replace(NEW_PREFIX, ""), value) })
val oldStream = stream.filter({ key, _ -> isOld(key) })
.map({key, value -> KeyValue(key.replace(OLD_PREFIX, ""), value) })
val joined = newStream.join(oldStream,
{ value1, value2 -> "$value1&$value2" }, windows)
joined.foreach({ key, value ->
log.info { "JOINED $key : $value" }
})
"1-old"
收到和
"1-new"
不是只有在这种情况下 2 分钟内我想报告 ID
1
因为可疑。
最佳答案
DSL 可能不会给你你想要的。但是,您可以使用处理器 API。话虽如此,leftJoin
实际上可以用来做“举重”。因此,在 leftJoin
之后您可以使用 .transform(...)
带有附加状态以进一步“清理”数据。
每个old&null
记录您收到,放入商店。如果您稍后收到 old&new
您可以将其从商店中删除。此外,您注册一个标点符号,并且在每次标点符号调用时,您都会扫描商店中“足够旧”的条目,以便您确定以后不会old&new
将产生连接结果。对于这些条目,您发出 old&null
并从商店中取出。
作为替代方案,您也可以省略连接,并在单个 transform()
中完成所有操作。与状态。为此,您需要KStream#merge()
新旧流与呼transform()
在合并的流上。
注意:除了注册标点符号之外,您还可以将“扫描逻辑”放入转换并在每次处理记录时执行它。
关于apache-kafka - 与 KafkaStreams 的窗口结束外连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48196450/
我正在开发 Kafka 流应用程序,但在弄清楚如何使聚合工作时遇到了一些麻烦。 我有一个 KStream bankTransactions其中键的类型为 String和类型 JsonNode 的值所以
我一直在研究 Kafka Streams 应用程序上的磁盘写入,并将拓扑减少到最低限度,即: KStream stream = builder.stream("input-topic"); 然而,在
我正在研究将现有 Flink 应用程序/拓扑转换为使用 KafkaStreams 的 POC。我的问题是关于部署。 具体来说 - 在 Flink 中,将“工作节点”添加到 flink 安装中,然后向拓
我正在玩 Kafka 和流技术;我已经为 KStream 创建了一个自定义序列化器和反序列化器,我将使用它来接收来自给定主题的消息。 现在,问题是我正在以这种方式创建一个 serde: JsonSer
我有一个 Kafka 消费者类,它监听事件并执行事件(订单、客户)之间的连接并将它们存储在物化 View 中。当收到 REST 调用时,我创建了另一个类来访问状态存储。但我越来越java.lang.I
我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示: KStreamBuilder builder = new KSt
我是 kafkaStream 的新手,我正在开发一个 Stream,但是当我启动我的应用程序时,很多日志都在记录。 例如,如何将日志级别从 Debbug 更改为 Info。 谢谢你。 6:54:12.
下面是我的代码片段。我想并行kafka流处理。但我不想放入 Runnable,也不想多次启动这个应用程序。 有没有类似streams.parallel()的方法? final
我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。 即 "1-new" , "1-old" , "2-new" , "2-old" .键是唯一的,但有些可能会丢失。 现在使
所以我试图让交互式查询与 Kafka 流一起工作。我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上)。我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和
我已经设置了一个简单的聚合,将来自多个流的值平均在一起,我正在尝试对其进行测试。我已经花了很多时间,但似乎无法直接理解这些概念。我的流如下: // Combine multiple streams t
在 Kafka(0.11.0.1) Streams 中,演示应用程序 Play with a Streams Application // Serializers/deserializers (ser
我正在尝试删除下游变更日志中值为 null 的记录,我知道在状态存储中它们只是通过为 null(逻辑删除)而被删除,但是当您对 KTable 或 Stream 进行聚合时,它们会跳过null 并且不删
是否有可能获得window final result在 Kafka Streams 中通过抑制中间结果。 我无法实现这个目标。我的代码有什么问题? val builder = StreamsB
来自https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStream ,我可以看到有一种简单的方法来运行运行
我正在为大容量高速分布式应用程序编写 Kafka Consumer。我只有一个主题,但收到的消息率非常高。为更多消费者提供服务的多个分区将适合此用例。最好的消费方式是拥有多个流阅读器。根据文档或可用示
我有四个使用相同应用程序 ID 运行的 Kafka 流应用程序实例。所有输入主题都属于单个分区。为了实现可扩展性,我通过了一个具有多个分区的中间虚拟主题。我已经设置了request.timeout.m
我尝试在 Kafka (0.11) 的聚合函数中使用 SessionWindows,但无法理解为什么会出现错误。 这是我的代码片段: // defining some values: public s
我用来创建 GlobalKTable 的话题非常活跃。在 KStream-GlobalKTable join 的文档中我读了 The GlobalKTable is fully bootstrappe
我有一个由 KafkaStreams Java api 编写的 Kafka 应用程序。它从 Mysql binlog 读取数据并执行一些与我的问题无关的操作。问题是某一特定行在 avro 反序列化过程
我是一名优秀的程序员,十分优秀!