gpt4 book ai didi

java - 如何注册无状态处理器(似乎也需要 StateStore)?

转载 作者:搜寻专家 更新时间:2023-10-31 08:22:07 26 4
gpt4 key购买 nike

我正在构建拓扑并想使用 KStream.process()将一些中间值写入数据库。此步骤不会改变数据的性质,并且是完全无状态的。

添加 Processor需要创建一个 ProcessorSupplier并将此实例连同状态存储的名称传递给 KStream.process() 函数。这是我不明白的。

如何添加StateStore反对拓扑,因为它需要 StateStoreSupplier

未能添加所述 StateStore 会在应用程序启动时出现此错误:

Exception in thread "main" org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: StateStore my-state-store is not added yet.

为什么处理器必须有状态存储?对于无状态且不维护状态的处理器来说,这似乎是可选的。

Process all elements in this stream, one element at a time, by applying a Processor.

最佳答案

这是一个 simple example on how to use state stores , 取自 Confluent Platform documentation on Kafka Streams .

第 1 步:定义 StateStore/StateStoreSupplier:

StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
  1. I don't see a way to add a StateStore object to my topology. It requires a StateStoreSupplier as well though.

第 2 步:将状态存储添加到您的拓扑。

选项 A - 使用处理器 API 时:

TopologyBuilder builder = new TopologyBuilder();

// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
.addProcessor("Process", () -> new WordCountProcessor(), "Source")
// Add the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")
.addSink("Sink", "sink-topic", "Process");

选项 B - 使用 Kafka Streams DSL 时:

在这里您需要调用 KStreamBuilder#addStateStore("name-of-your-store") 将状态存储添加到您的处理器拓扑中。然后,在调用 KStream#process()KStream#transform() 等方法时,您还必须传入状态存储的名称——否则您的应用程序将运行时失败。

KStream#transform() 的例子中:

KStreamBuilder builder = new KStreamBuilder();

// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);

KStream<byte[], String> input = builder.stream("source-topic");

KStream<String, Long> transformed =
input.transform(/* your TransformerSupplier */, countStore.name());

Why is it necessary for a processor to have a state store? It seems that this could well be optional for processors that are stateless and don't maintain state.

你是对的——如果你的处理器不维护状态,你就不需要状态存储。

使用 DSL 时,您只需调用 KStreamBuilder#addStateStore("name-of-your-store") 即可将状态存储添加到您的处理器拓扑中,并在稍后引用它。

关于java - 如何注册无状态处理器(似乎也需要 StateStore)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39078860/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com