- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
澄清一下,我是 Kafka 的新手,如果我的问题似乎没有记录,我很抱歉,我正在阅读教程、文档和我能理解的一切内容。
我正在尝试从 GlobalStore 读取所有值以更新其值,然后使用已存在的 StateStore 来放置这些新的更新值。
我正在尝试这样做,因为当我这样做时:
this.stateStore.all();
我只有1/10的数据,如果我理解正确的话,这是因为我有10个分区,而ss,只读取一个(虽然我不太明白为什么)
这是我的全局表:
public StreamsBuilder declareTopology(StreamsBuilder builder) {
logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",
getInputTopic(),
getDataTopic(),
getToEsTopic());
builder.globalTable(
getDataTopic(),
Consumed.with(Serdes.String(), fooSerdes)
.withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),
Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(
"foosktable")
.withKeySerde(Serdes.String())
.withValueSerde(fooSerdes)
.withLoggingEnabled(new HashMap<>()));
...
这是 addStateStore,我无法删除它,因为它在代码的其他地方使用:
...
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("foosktable"),
Serdes.String(),
fooSerdes));
...
return builder;
}
所以,从理论上讲,我想做的是删除也使用相同主题的 StateStore,并将我的数据使用我的 data.process 主题之一,问题是该处理器用它做其他事情StateStore,所以我不能用核武器攻击它。
我在这里迷路了,任何光都会有很大帮助。谢谢!
最佳答案
有点不清楚您实际想要实现的目标。但是,一些高级解释:
GlobalKTable
只有一个用途:从主题读取数据而不进行修改,以允许执行 KStream-GlobalKTable
-join 或通过“查询存储”交互式查询”。
因此,您无法真正执行您想要的操作,因为无法按照您的意图将数据从全局存储复制到另一个存储。您需要复制输入主题并读取两次:(1) 作为 GlobalKTable
和 (2) 作为常规 KStream
来修改数据,然后再将其放入存储中。对于 (2),您可以使用 transform()
。
希望这有帮助。
关于java - 如何在同一主题上使用globalKtable和StateStore?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58687018/
我正在构建拓扑并想使用 KStream.process()将一些中间值写入数据库。此步骤不会改变数据的性质,并且是完全无状态的。 添加 Processor需要创建一个 ProcessorSupplie
我有两个 Java 应用程序(App1、App2)来测试如何访问 KTable来自 docker 中单实例环境中的不同应用程序。 第一个应用程序 (App1) 写入 KTable使用以下代码。 pub
我有一个紧凑的主题,大约有 30 个 Mio 键。 我的 App将此主题具体化为 KeyValueStore . 我如何检查 KeyValueStore是完全人口?如果我通过 InteractiveQ
在 Kafka 流中 WordCount例如,它使用 StateStore存储字数。如果同一个消费者组中有多个实例,StateStore对组来说是全局的,还是只对消费者实例来说是局部的? 纳克斯 最佳
我在尝试通过 Kafka Streams 实现以下目标时遇到了一些麻烦: 在应用程序启动时,(压缩的)主题 alpha 被加载到键值 StateStore 映射中 Kafka Stream 从另一个主
我们的目标是实现以下架构。最重要的是能够读取主题 T1 的所有数据(来自所有分区)。 我们面临的问题是我们无法在从不同构建器创建的两个节点之间进行连接(每个实例中有两个不同的构建器)。在每个实例中,我
我们目前正在实现一个流程(使用 Kafka Processor API),我们需要将来自 2 个相关事件(消息)的信息组合到一个主题上,然后转发这些组合信息。事件源自 IoT 设备,并且由于我们希望将
尝试访问定义的状态存储中的所有键值,但是在 .transform() 方法中我只能使用一个键(即源键)访问 KeyValueStore SS=context.getStateStore("macs")
我正在尝试实现一个 Transformer 类 public class StreamSorterByTimeStampWithDelayTransformer implements Tra
我的代码中有一个具体化的内存状态存储。我有另一个单独的流,应该根据某些条件查找和删除记录。 我需要允许我的流访问和删除先前构建的状态存储中的记录。我下面有以下代码 @bean public Strea
在 impala 的官方文档中 here , statestore 组件有一条语句: If you issue a DDL statement while the statestore is down
我是一名优秀的程序员,十分优秀!