gpt4 book ai didi

java - 如何有条件地将 Transform 应用于 PCollection?

转载 作者:行者123 更新时间:2023-12-02 09:35:21 27 4
gpt4 key购买 nike

我有一个 PCollection,并且希望在验证条件时应用自定义 PTransform(该条件不依赖于Pcollection内容)
示例:我有日志,如果 PipelineOptions 中提供了日期,我想根据该日期进行过滤。

现在,我最好的解决方案是:

// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
if(!date.equals("")){
logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
logs = logs.apply(...

它有效,但我不喜欢重新分配日志。更重要的是,我不喜欢破坏 apply 的链条。这看起来不像是一个优雅的方法。

是否有某种条件PTransform?或者如果没有,将条件检查放在 PTransform 中并在未验证的情况下输出所有内容是否会更有效?

梦想示例:

PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput()))
.applyIf("FilterOnDate", ParDo.of(new DateFilterFn(date)), !date.equals(""))
.apply(...

最佳答案

不幸的是,Beam 没有任何类似 applyIf 的功能,您当前的方法是进行此类条件过滤的一般方法。

PTransform 中的条件检查为每个元素添加了额外的操作,这将根据检查类型对性能产生影响。

如果可能,最好避免从管道进行转换,而不是使 PTransform 变得更复杂。

从代码美观的角度来看,您可以使用包装器变换来有条件地应用相关的过滤器 pardo。示例:

public static class ConditionallyFilter
extends PTransform<PCollection<LogRow>, PCollection<LogRow>> {
private final String date;
public ConditionallyFilter(String date){
this.date = date;
}
@Override
public PCollection<LogRow> expand(PCollection<LogRow> logs) {
if(!date.equals("")){
logs = logs.apply("FilterOnDate", ParDo.of(new DateFilterFn(date)));
}
return logs;
}
}


// Read File
PCollection<LogRow> logs = p.apply("LoadData", TextIO.read().from(options.getInput())).apply(new ConditionallyFilter(date)).apply(...

关于java - 如何有条件地将 Transform 应用于 PCollection?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57575156/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com