- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在尝试通过 Kafka Streams 实现以下目标时遇到了一些麻烦:
alpha
被加载到键值 StateStore
映射中alpha
我的方法如下:
val builder = new StreamsBuilderS()
val store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)
builder.addStateStore(store)
val loaderStreamer = new LoaderStreamer(store).startStream()
[...] // I wait a few seconds until the loading is complete and the stream os running
val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!
builder
.stream("another-topic")(Consumed.`with`(kSerde, vSerde))
.doMyAggregationsAndgetFromTheMapAbove
.transform(() => new StoreTransformer[K, V]("store"), "store")
.to("alpha")(Produced.`with`(kSerde, vSerde))
LoaderStreamer(商店)
:
[...]
val builders = new StreamsBuilderS()
builder.addStateStore(store)
builder
.table("alpha")(Consumed.`with`(kSerde, vSerde))
builder.build
[...]
StoreTransformer
:
[...]
override def init(context: ProcessorContext): Unit = {
this.context = context
this.store =
context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}
override def transform(key: K, value: V): (K, V) = {
store.put(key, value)
(key, value)
}
[...]
...但我得到的是:
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.
在尝试获取商店处理程序时。
关于如何实现这一点有什么想法吗?
谢谢!
最佳答案
您不能在两个 Kafka Streams 应用程序之间共享状态存储。
根据文档:https://docs.confluent.io/current/streams/faq.html#interactive-queries上述异常可能有两个原因:
本地 KafkaStreams 实例尚未就绪,因此无法查询其本地状态存储。
本地 KafkaStreams 实例已准备就绪,但特定状态存储刚刚在幕后迁移到另一个实例。
处理它的最简单方法是等到状态存储可查询:
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
整个示例可以在 confluent github 找到.
关于apache-kafka - 如何在两个 Kafka 流之间使用持久化的 StateStore,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55656508/
我正在构建拓扑并想使用 KStream.process()将一些中间值写入数据库。此步骤不会改变数据的性质,并且是完全无状态的。 添加 Processor需要创建一个 ProcessorSupplie
我有两个 Java 应用程序(App1、App2)来测试如何访问 KTable来自 docker 中单实例环境中的不同应用程序。 第一个应用程序 (App1) 写入 KTable使用以下代码。 pub
我有一个紧凑的主题,大约有 30 个 Mio 键。 我的 App将此主题具体化为 KeyValueStore . 我如何检查 KeyValueStore是完全人口?如果我通过 InteractiveQ
在 Kafka 流中 WordCount例如,它使用 StateStore存储字数。如果同一个消费者组中有多个实例,StateStore对组来说是全局的,还是只对消费者实例来说是局部的? 纳克斯 最佳
我在尝试通过 Kafka Streams 实现以下目标时遇到了一些麻烦: 在应用程序启动时,(压缩的)主题 alpha 被加载到键值 StateStore 映射中 Kafka Stream 从另一个主
我们的目标是实现以下架构。最重要的是能够读取主题 T1 的所有数据(来自所有分区)。 我们面临的问题是我们无法在从不同构建器创建的两个节点之间进行连接(每个实例中有两个不同的构建器)。在每个实例中,我
我们目前正在实现一个流程(使用 Kafka Processor API),我们需要将来自 2 个相关事件(消息)的信息组合到一个主题上,然后转发这些组合信息。事件源自 IoT 设备,并且由于我们希望将
尝试访问定义的状态存储中的所有键值,但是在 .transform() 方法中我只能使用一个键(即源键)访问 KeyValueStore SS=context.getStateStore("macs")
我正在尝试实现一个 Transformer 类 public class StreamSorterByTimeStampWithDelayTransformer implements Tra
我的代码中有一个具体化的内存状态存储。我有另一个单独的流,应该根据某些条件查找和删除记录。 我需要允许我的流访问和删除先前构建的状态存储中的记录。我下面有以下代码 @bean public Strea
在 impala 的官方文档中 here , statestore 组件有一条语句: If you issue a DDL statement while the statestore is down
我是一名优秀的程序员,十分优秀!