- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个PCollection<KV<String,String>> Pcol
,我正在处理 ParDo
中的每个元素方法。我想根据 pardo 本身的某些条件将记录发布到 kafka 主题。
我该怎么做?
PCollection<KV<String, String>> Pcol =pipeline.apply("Process Data",
ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
String key = element.getKey();
if(key==null)
{//publish to a kafka topic}
}
})
);
最佳答案
关于java - 如何在 apache beam 中的 ParDo 函数中处理 PCollection<KV<String,String>> 中的元素时将元素发布到 kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56727717/
如何将变量从管道外部传递到 Dataflow 作业的 ParDo 函数。下面是一个示例,我尝试在创建管道之前派生 fileDate 并将其传递给 ParDo 函数。我在接口(interface)中声明
*我用良好的数据填充了 map (没有空值),但我无法进入下一个 ParDo 函数。我尝试调试,但不明白为什么会发生这种情况。如果有人知道我做错了什么,请告诉我。我正在放置三个 ParDo 函数。谢谢
我有一个 ParDo 转换,我在其中进行阻塞 Web 服务调用以获取一些数据。调用需要一段时间才能返回(比如大约 1 分钟)。我观察到即使在相当大的 PCollection 上调用,此 ParDo 转
我想调用 beam.io.Write(beam.io.BigQuerySink(..))从 ParDo 函数内进行操作,为 PCollection 中的每个键生成单独的 BigQuery 表(我正在使
采用 ndjson 格式的文本文件,以下代码生成我所期望的结果。一个 ndjson 文件,其中包含未嵌套的 quotes.USD 字典,并且删除了原始的 quotes 元素。 def unnest
我目前不熟悉在 Python 中将 Apache Beam 与数据流运行器结合使用。我对创建发布到 Google Cloud PubSub 的批处理管道很感兴趣,我修改了 Beam Python AP
虽然我正在破解一个快速的 CSV 到 Firebase 上传,但我只是这样做而不是编写自定义接收器。这是对代码的过度简化: public static void main(String[] args)
我是 apache_beam 的新手,我正在尝试开发一个管道。我有 2 个具有相同格式的 pCollection,还有另一个具有其他格式的 pCollection。我尝试为 pCollection 3
由于文档仅适用于 JAVA,我无法真正理解它的含义。 它指出 - “虽然 ParDo 总是产生一个主输出 PCollection(作为 apply 的返回值),你也可以让你的 ParDo 产生任意数量
我们正在使用 Beam 的 Java SDK 2.0.0 在 ParDo 中生成顺序索引。就像 Beam 的 introduction to stateful processing 中的简单状态索引示
Dataflow/Apache Beam 中的 ParDo 和 FlatMap 之间有区别吗? 我认为两者都对传入的PCollection的每个元素应用一个函数,并返回可迭代的;但我想一定有什么区别?
我正在尝试在 Beam/Java 中编写一个数据流作业来处理来自 Pub/Sub 并写入 Parquet 的一系列事件。 Pub/Sub 中的事件采用 JSON 格式,每个事件可以生成一行或多行。我能
我是 Apache Beam 的新手。根据我们的要求,我需要传递一个包含 5 到 10 个 JSON 记录的 JSON 文件作为输入,并从文件中逐行读取此 JSON 数据并将其存储到 BigQuery
我有一个 PCollection,我想使用 ParDo 从中过滤掉一些元素。 有什么地方可以找到这个例子吗? 最佳答案 在 Apache Beam Python SDK 中,有一个过滤器转换,它接收一
我有一个PCollection> Pcol ,我正在处理 ParDo 中的每个元素方法。我想根据 pardo 本身的某些条件将记录发布到 kafka 主题。 我该怎么做? PCollection> P
我是一名优秀的程序员,十分优秀!