gpt4 book ai didi

java - 使用Kafka Stream中的状态存储(RocksDB)将一条记录转换为多条记录

转载 作者:行者123 更新时间:2023-12-01 17:42:49 25 4
gpt4 key购买 nike

我想使用状态存储(RocksDB)将一条记录转换为多条记录。我知道有一种方法,例如 Stream.transform(final TransformerSupplier> TransformerSupplier,final String... stateStoreNames) 但如何返回多个 KeyValue 对,以便我稍后可以使用分支发布到受尊重的主题?

有一种方法可以将数据转发到下游,但我如何再次使用该数据?

卡夫卡版本 - 1.1.0

最佳答案

如果我理解正确,您希望根据状态存储中的数据发出多条记录。与transform()在 Kafka Streams 2.2 之前,您可以通过调用 context.forward() 来做到这一点多次在您的Transformer中。例如:

stream
.transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>() {
private ProcessorContext context;

@Override
public void init(final ProcessorContext context) {
this.context = context;
}

@Override
public KeyValue<Integer, Integer> transform(final Integer key, final Integer value) {
context.forward(key, value);
context.forward(key + 1, value + 1;)
return null;
}

@Override
public void close() {
}
}, stateStoreName);

请注意,通过使用 context.forward() ,您没有编译时类型安全性。如果转发的记录类型与输出的类型不符 KStream (上例中的 <Integer, Integer> ),代码可以编译,但在运行时抛出异常。

从 Kafka Streams 2.2 开始,您可以使用 flatTransform() 。与flatTransform()您可以返回记录列表,而不是使用 context.forward()多次返回null如上例所示。这么用flatTransform()保证编译时类型安全。

关于java - 使用Kafka Stream中的状态存储(RocksDB)将一条记录转换为多条记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58930728/

25 4 0