gpt4 book ai didi

java - 如何将附加字段添加到光束 FileIO.matchAll() 结果中?

转载 作者:行者123 更新时间:2023-11-30 10:06:06 30 4
gpt4 key购买 nike

我有一个 KV 的 PCollection,其中键是 gcs file_patterns,值是文件的一些附加信息(例如,生成文件的“源”系统)。例如,

KV("gs://bucket1/dir1/*", "SourceX"),
KV("gs://bucket1/dir2/*", "SourceY")

我需要一个 PTransferm 来将 file_patterns 扩展到 GCS 文件夹中的所有匹配文件,并保留“源”字段。例如,如果 dir1 下有两个文件 X1.dat,X2.dat,dir2 下有一个文件(Y1.dat),则输出将是:

KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir1/X2.dat", "SourceX")
KV("gs://bucket1/dir2/Y1.dat", "SourceY")

我可以使用 FileIO.matchAll() 来实现吗?我坚持如何将“源”字段组合/加入匹配文件。这是我正在尝试的,但还没有完全成功:

public PCollection<KV<String, String> expand(PCollection<KV<String, String>> filesAndSources) {
return filesAndSources
.apply("Get file names", Keys.create())
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(ParDo.of(
new DoFn<ReadableFile, KV<String, String>>() {

@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
c.output(KV.of(fileName, XXXXX)); // How to get the value field ("Source") from the input KV?

我的难点在于最后一行,对于XXXXX,我如何从输入的KV中获取值字段(“Source”)?将输入 KV 的值“加入”或“组合”回“扩展”键的任何方式,因为一个键 (file_pattern) 被扩展为多个值。

谢谢!

最佳答案

MatchResult.Medata包含您已经在使用的 resourceId 但不包含它匹配的 GCS 路径(带通配符)。

您可以使用侧输入实现您想要的。为了演示这一点,我创建了以下 filesAndSources(根据您的评论,这可能是一个输入参数,因此不能在下游进行硬编码):

PCollection<KV<String, String>> filesAndSources = p.apply("Create file pattern and source pairs",
Create.of(KV.of("gs://" + Bucket + "/sales/*", "Sales"),
KV.of("gs://" + Bucket + "/events/*", "Events")));

我将其具体化为辅助输入(在本例中为 Map)。键将是转换为正则表达式的 glob 模式(感谢 this answer ),值将是源字符串:

final PCollectionView<Map<String, String>> regexAndSources =
filesAndSources.apply("Glob pattern to RegEx", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
String regex = c.element().getKey();

StringBuilder out = new StringBuilder("^");
for(int i = 0; i < regex.length(); ++i) {
final char ch = regex.charAt(i);
switch(ch) {
case '*': out.append(".*"); break;
case '?': out.append('.'); break;
case '.': out.append("\\."); break;
case '\\': out.append("\\\\"); break;
default: out.append(ch);
}
}
out.append('$');
c.output(KV.of(out.toString(), c.element().getValue()));
}})).apply("Save as Map", View.asMap());

然后,在读取文件名后,我们可以使用侧输入来解析每个路径以查看哪个是匹配的模式/源对:

filesAndSources
.apply("Get file names", Keys.create())
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();

Set<Map.Entry<String,String>> patternSet = c.sideInput(regexAndSources).entrySet();

for (Map.Entry< String,String> pattern:patternSet)
{
if (fileName.matches(pattern.getKey())) {
String source = pattern.getValue();
c.output(KV.of(fileName, source));
}
}
}}).withSideInputs(regexAndSources))

请注意,正则表达式转换是在实现侧输入之前而不是此处完成的,以避免重复工作。

输出,正如我所预期的那样:

Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/events/*
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/sales/*
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales1.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales2.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events1.csv, value=Events
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events2.csv, value=Events

Full code .

关于java - 如何将附加字段添加到光束 FileIO.matchAll() 结果中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54838908/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com