gpt4 book ai didi

apache-kafka - Kafka Streams - 解释 KTable 及其关联的 Store 仅每 30 秒更新一次的原因

转载 作者:行者123 更新时间:2023-12-05 01:01:09 29 4
gpt4 key购买 nike

我有这个生成 Store 的简单 KTable 定义:

KTable<String, JsonNode> table = kStreamBuilder.<String, JsonNode>table(ORDERS_TOPIC, ORDERS_STORE);
table.print();

我将消息发布到 ORDERS_TOPIC 但商店直到每 30 秒才真正更新一次。这是因为 30000 毫秒时间已经过去而有关于提交的消息的日志:

2017-07-25 23:53:15.465 DEBUG 17540 --- [ StreamThread-1] o.a.k.c.consumer.internals.Fetcher       : Sending fetch for partitions [orders-0] to broker EXPRF026.SUMINISTRADOR:9092 (id: 0 rack: null)
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing all tasks because the commit interval 30000ms has elapsed
2017-07-25 23:53:15.567 INFO 17540 --- [ StreamThread-1] o.a.k.s.p.internals.StreamThread : stream-thread [StreamThread-1] Committing task StreamTask 0_0
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.processor.internals.StreamTask : task [0_0] Committing its state
2017-07-25 23:53:15.567 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.i.ProcessorStateManager : task [0_0] Flushing all stores registered in the state manager
f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec
{"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}
[KTABLE-SOURCE-0000000001]: f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec , ({"uid":"string","productId":0,"orderId":"f2b9ff2b-62c3-470e-8df1-066cd1e3d5ec","name":"OrderPlaced","state":"PENDING_PRODUCT_RESERVATION"}<-null)
2017-07-25 23:53:15.569 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.state.internals.ThreadCache : Thread order-service-streams-16941f70-87b3-45f4-88de-309e4fd22748-StreamThread-1 cache stats on flush: #puts=1, #gets=1, #evicts=0, #flushes=1
2017-07-25 23:53:15.576 DEBUG 17540 --- [ StreamThread-1] o.a.k.s.p.internals.RecordCollectorImpl : task [0_0] Flushing producer

我发现控制这个的属性是commit.interval.ms:

props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);

为什么默认设置为 30000 毫秒(听起来很长),将其更改为 10 毫秒有何含义?

如果我使用 KStream 而不是 KTable...

KStream<String, JsonNode> kStream = kStreamBuilder.stream(ORDERS_TOPIC);
kStream.print();

...我可以立即看到消息,而无需等待 30000 毫秒,为什么会有差异?

最佳答案

特别是与内存管理有关,KTable 缓存:http://docs.confluent.io/current/streams/developer-guide.html#memory-management

KTable 实际上一直在更新,如果您使用 "Interactive Queries"要访问底层状态存储,您可以立即获取每个更新。但是,KTable 缓存会缓冲更新以减少下游负载,并且每次触发提交时,都需要向下游刷新该缓存以避免失败时数据丢失。如果您的缓存大小很小,如果从缓存中逐出某个键,您可能还会看到下游记录。

关于commit interval:一般来说commit interval设置一个比较大的值,以减少brokers的commit负担。

关于apache-kafka - Kafka Streams - 解释 KTable 及其关联的 Store 仅每 30 秒更新一次的原因,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45314215/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com