- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我最近开始试验 kafka 流。我有一个场景需要加入 KStream
与 KTable
. KTable
可能是这种情况不包含某些键。在那种情况下,我得到一个 NullPointerException
.
特别是我得到
stream-thread [StreamThread-1] Streams application error during processing: java.lang.NullPointerException
ReadOnlyKeyValueStore
查询底层存储以查找键是否存在。界面。
Global State Store
在版本中引入
10.2
在下一阶段,我是否应该期望我也能够以相同的方式查询
Global State Store
?
valueJoiner
只返回结果,而不是对连接的值执行操作,并在连接后添加额外的过滤步骤以过滤掉空值。
最佳答案
我的问题的解决方案来自于了解 join
语义好一点。
就像在数据库连接中一样(虽然我并不是说 Kstream
连接精确地遵循 db 连接概念),左连接操作会在缺少右侧键的地方产生具有空值的行。
所以最终我唯一要做的就是解耦我的 valueJoiner
从随后的计算/操作(我需要对连接记录的字段执行一些计算并返回一个新构造的对象)并让它只返回连接值的数组。然后我可以过滤掉导致 null
的记录。通过检查这些数组来获取值。
根据 Matthias 的 J. Sax 建议,我使用了 0.10.2
版本而不是 0.10.1
兼容代理版本 0.10.1
并更换整个leftJoin
带有内连接的逻辑,无需过滤 null
值。
关于apache-kafka - 当 KTable 中缺少 key 时,处理 KStream 与 KTable 的连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43142081/
我有两个流来自两个主题 orders 和 fsource。主要订单是静态的,很少更新,而 fsource 的更新速度为每秒 1000 个。这里我使用了 KTable-KTabke join 因为它们具
我对以下拓扑的行为方式有一些问题: String topic = config.topic(); KTable myTable = topology.builder().table(UUIDSerde
documentation来自 Confluent 网站提到以下内容: The left KTable can have multiple records which map to the same
KTable table1 KTable table2 我正在尝试加入两个 KTables (无窗口) 键并将结果写为 到输出主题。 谁能帮我提供一些如何执行此操作的示例。 最佳答案 因为在 KTab
请参阅下面的更新以显示潜在的解决方法 我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增
我最近开始试验 kafka 流。我有一个场景需要加入 KStream与 KTable . KTable 可能是这种情况不包含某些键。在那种情况下,我得到一个 NullPointerException
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTest
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTest
我有一个 kafka 主题和一个听它的 KTable。 我想写一个 http POST 请求,它将遍历 ktable 中的当前项目,对它们执行一些操作并写回主题 所以基本上我有: private va
我们有以下高级 DSL 处理拓扑: TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceM
用例 - 有一个包含消息的主题(空,元数据)。我需要从主题创建一个 Ktable,其中键 (metadata.entity_id) 和值作为元数据。该表稍后将用于与具有相同键的流进行连接。 p
我在 Kafka 中创建带有时间窗口的 KTable 时遇到了一些问题。 我想创建一个表来计算流中 ID 的数量,就像这样。 ID (String) | Count (Long) X
假设我有一个包含多个分区的主题。我在其中写入 K/V 数据,并希望通过按键在 Tumbling Windows 中聚合所述数据。 假设我已经启动了与我有分区一样多的工作实例,并且每个工作实例都在单独的
我正在尝试在 Kafka Streams 中加入一个类似于许多文章的简单外键(例如: https://www.confluent.io/blog/data-enrichment-with-kafka-
如何确定主题的 KTable 物化何时完成? 例如假设 KTable 有几百万行。伪代码如下: KTable kt = kgroupedStream.groupByKey(..).reduce(..)
我有一个窗口KTable按预期工作,但每次收到新值时它都会输出。我找到了 .suppress 运算符,它完全符合我的要求:仅在时间窗口结束时输出结果。我已向 TimeWindow 添加了 grace
我是 Kafka Streams API 的新手,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic,它是一个json格式的消息,如下所示。 { "current_ts":
我想编写一个小型 Kafka Streams 应用程序,它会减少输入流的时间窗口,对值进行一些映射,然后将生成的 toStream() 更改日志发送到另一个主题。使用我的代码,我在 toStream(
有没有办法计算 Java/scala 应用程序中 KTable/KStream 随着时间的推移大约将使用多少堆(或任何其他)内存? 我有一些具体的假设,我想知道它们是否正确: Kafka 流仅使用内部
如何确定主题的 KTable 物化何时完成? 例如假设 KTable 有几百万行。伪代码如下: KTable kt = kgroupedStream.groupByKey(..).reduce(..)
我是一名优秀的程序员,十分优秀!