gpt4 book ai didi

apache-kafka-streams - 如何使用Kafka Stream DSL使用处理器过滤键和值

转载 作者:行者123 更新时间:2023-12-04 13:37:12 27 4
gpt4 key购买 nike

我有一个与StateStore进行交互以对消息进行过滤和执行复杂逻辑的处理器。在process(key,value)方法中,我使用context.forward(key,value)发送所需的键和值。出于调试目的,我还打印了这些内容。

我有一个KStream mergedStream,它是来自其他两个流的联接而产生的。我想将处理器应用于该流的记录。我用mergedStream.process(myprocessor,"stateStoreName")实现

启动该程序时,我可以看到要打印到控制台的正确值。但是,如果我使用mergedStream.to("topic")将mergedStream发送到某个主题,则该主题上的值不是我在处理器中转发的那个值,而是原始值。

我使用kafka-streams 0.10.1.0。

将我在处理器中转发的值传递到另一个流的最佳方法是什么?

是否可以将Processor APIKStream DSL创建的流混合?

最佳答案

简称:

为了解决您的问题,您可以使用transform(...)代替process(...),这也使您可以访问DSL中的Processor API。

:

如果使用process(...),则将处理器应用于流-但是,这是一个“终止”(或接收器)操作(其返回类型为void),即,它不返回任何结果(此处的“接收器”仅表示运算符(operator)没有后继者-这并不意味着任何结果都写在某个地方!)

此外,如果您调用mergedStream.process(...)mergedStream.to(...),则基本上是分支并复制流,并将一个副本发送给每个下游运算符(即,一个副本发送给process和一个副本发送给to)。

混合使用DSL和Processor API是绝对可能的(您已经完成了;)。但是,使用process(...)不能使用DSL中的forward(...)消费数据-如果要使用Processor API结果,则可以使用transform(...)而不是process(...)

关于apache-kafka-streams - 如何使用Kafka Stream DSL使用处理器过滤键和值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40814437/

27 4 0