gpt4 book ai didi

java - 如何在 apache beam 中的 ParDo 函数中处理 PCollection> 中的元素时将元素发布到 kafka 主题?

转载 作者:行者123 更新时间:2023-12-02 04:02:15 25 4
gpt4 key购买 nike

我有一个PCollection<KV<String,String>> Pcol ,我正在处理 ParDo 中的每个元素方法。我想根据 pardo 本身的某些条件将记录发布到 kafka 主题。

我该怎么做?

PCollection<KV<String, String>> Pcol =pipeline.apply("Process Data", 
ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
String key = element.getKey();
if(key==null)
{//publish to a kafka topic}
}
})
);

最佳答案

在Beam中,Sinks通常由ParDo实现。因此,您可以引用Beam中Kafka接收器的实现方式,了解如何在ParDo中写入Kafka。更具体地说,这两个类可能会有所帮助:12 .

关于java - 如何在 apache beam 中的 ParDo 函数中处理 PCollection<KV<String,String>> 中的元素时将元素发布到 kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56727717/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com