- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
如何确定主题的 KTable 物化何时完成?
例如假设 KTable 有几百万行。伪代码如下:
KTable<String, String> kt = kgroupedStream.groupByKey(..).reduce(..); //Assume this produces few million rows
在某个时间点,我想安排一个线程来调用以下写入主题的线程:kt.toStream().to("output_topic_name");
我想确保所有数据都作为上述调用的一部分写入。另外,一旦调用了上述“to”方法,是否可以在下一个调度中调用它,或者第一个调用是否始终保持事件状态?
后续问题:
约束
1) 好的,我看到一旦 kafkastream 启动,kstream 和 ktable 是无界/无限的。但是,ktable 实体化(到压缩主题)不会在指定时间段内为同一键发送多个条目。
因此,除非压缩过程尝试清除这些并仅保留最新的,否则下游应用程序将消耗所有可用条目以从主题查询相同的键,从而导致重复。即使压缩过程进行了某种程度的清理,也总是不可能在给定的时间点,随着压缩过程的进行,有些键具有多个条目。
我假设 KTable 对于 RocksDB 中的给定键只有一条记录。如果我们有办法安排物化,那将有助于避免重复。此外,减少主题中持久化的数据量(增加存储),增加网络流量,压缩过程的额外开销以清理它。
2) 也许 ReadOnlyKeyValueStore 允许从存储中进行受控检索,但它仍然缺乏安排检索键、值和写入主题的方法,这需要额外的编码。
是否可以改进 API 以允许受控实现?
最佳答案
KTable 实体化永远不会完成,您也不能“调用”to()
。
当您使用 Streams API 时,您将运算符的 DAG“连接在一起”。实际的方法调用,不触发任何计算,只是修改算子的DAG。
只有在通过 KafkaStreams#start()
开始计算后,数据才会被处理。请注意,您指定的所有运算符将在计算开始后连续并发运行。
没有“计算结束”,因为输入应该是无界/无限的,因为上游应用程序可以随时将新数据写入输入主题。因此,您的程序永远不会自行终止。如果需要,您可以通过 KafkaStreams#close()
停止计算。
在执行期间,您不能更改 DAG。如果要更改它,则需要停止计算并创建一个新的 KafkaStreams
实例,将修改后的 DAG 作为输入
跟进:
是的。您必须将 KTable 视为一个“版本化的表”,当条目更新时,它会随着时间的推移而演变。因此,所有更新都写入变更日志主题并作为变更记录发送到下游(请注意,KTables 也会进行一些缓存,以“删除重复”对同一键的连续更新:参见 https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html)。
will consume all available entries for the same key querying from the topic, causing duplicates.
我不会将它们视为“重复”,而是更新。是的,应用程序需要能够正确处理这些更新。
if we have a way to schedule the materialization, that will help to avoid the duplicates.
具体化是一个连续的过程,每当输入主题中有新的输入记录可用并被处理时,KTable 就会更新。因此,在任何时间点都可能有特定 key 的更新。因此,即使您可以完全控制何时向更新日志主题和/或下游发送更新,稍后也可能会有新的更新。这就是流处理的本质。
Also, reduce the amount of data being persisted in topic (increasing the storage), increase in the network traffic, additional overhead to the compaction process to clean it up.
前面说了缓存是为了节省资源。
Can the API be improved to allow a controlled materialization?
如果提供的 KTable 语义不符合您的要求,您可以随时编写一个自定义运算符作为 Processor
或 Transformer
,为其附加一个键值存储,并实现您需要的任何内容。
关于apache-kafka - 卡夫卡流 : KTable materialization,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50440550/
我有两个流来自两个主题 orders 和 fsource。主要订单是静态的,很少更新,而 fsource 的更新速度为每秒 1000 个。这里我使用了 KTable-KTabke join 因为它们具
我对以下拓扑的行为方式有一些问题: String topic = config.topic(); KTable myTable = topology.builder().table(UUIDSerde
documentation来自 Confluent 网站提到以下内容: The left KTable can have multiple records which map to the same
KTable table1 KTable table2 我正在尝试加入两个 KTables (无窗口) 键并将结果写为 到输出主题。 谁能帮我提供一些如何执行此操作的示例。 最佳答案 因为在 KTab
请参阅下面的更新以显示潜在的解决方法 我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增
我最近开始试验 kafka 流。我有一个场景需要加入 KStream与 KTable . KTable 可能是这种情况不包含某些键。在那种情况下,我得到一个 NullPointerException
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTest
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTest
我有一个 kafka 主题和一个听它的 KTable。 我想写一个 http POST 请求,它将遍历 ktable 中的当前项目,对它们执行一些操作并写回主题 所以基本上我有: private va
我们有以下高级 DSL 处理拓扑: TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceM
用例 - 有一个包含消息的主题(空,元数据)。我需要从主题创建一个 Ktable,其中键 (metadata.entity_id) 和值作为元数据。该表稍后将用于与具有相同键的流进行连接。 p
我在 Kafka 中创建带有时间窗口的 KTable 时遇到了一些问题。 我想创建一个表来计算流中 ID 的数量,就像这样。 ID (String) | Count (Long) X
假设我有一个包含多个分区的主题。我在其中写入 K/V 数据,并希望通过按键在 Tumbling Windows 中聚合所述数据。 假设我已经启动了与我有分区一样多的工作实例,并且每个工作实例都在单独的
我正在尝试在 Kafka Streams 中加入一个类似于许多文章的简单外键(例如: https://www.confluent.io/blog/data-enrichment-with-kafka-
如何确定主题的 KTable 物化何时完成? 例如假设 KTable 有几百万行。伪代码如下: KTable kt = kgroupedStream.groupByKey(..).reduce(..)
我有一个窗口KTable按预期工作,但每次收到新值时它都会输出。我找到了 .suppress 运算符,它完全符合我的要求:仅在时间窗口结束时输出结果。我已向 TimeWindow 添加了 grace
我是 Kafka Streams API 的新手,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic,它是一个json格式的消息,如下所示。 { "current_ts":
我想编写一个小型 Kafka Streams 应用程序,它会减少输入流的时间窗口,对值进行一些映射,然后将生成的 toStream() 更改日志发送到另一个主题。使用我的代码,我在 toStream(
有没有办法计算 Java/scala 应用程序中 KTable/KStream 随着时间的推移大约将使用多少堆(或任何其他)内存? 我有一些具体的假设,我想知道它们是否正确: Kafka 流仅使用内部
如何确定主题的 KTable 物化何时完成? 例如假设 KTable 有几百万行。伪代码如下: KTable kt = kgroupedStream.groupByKey(..).reduce(..)
我是一名优秀的程序员,十分优秀!