gpt4 book ai didi

google-cloud-dataflow - 如何在 Dataflow 中正确使用 'flatten'

转载 作者:行者123 更新时间:2023-12-01 02:09:56 25 4
gpt4 key购买 nike

我们的管道看起来像这样:

GCS(gz 压缩文件)-> ParDo-> BigQuery

我想使用“扁平化”从 GCS 中获取多个文件作为管道的输入。但它不断出现错误:

Workflow failed. Causes: (5001e5764f46ac2c): BigQuery creation of import job for table "Impressions_05_2015_denormalized_test" in dataset "CPT_XXXX" in project "gdfp-XXXX" failed. Causes: (5001e5764f46a1cf): Error:
Message: Load configuration must specify at least one source URI
HTTP Code: 400

代码:
PCollection<String> file1 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_21.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollection<String> file2 = pipeline.apply(TextIO.Read.from("gs://<bucket_name_removed>/NetworkActiveViews_232503_20140918_22.gz").withCompressionType(TextIO.CompressionType.GZIP));
PCollectionList<String> allFiles = PCollectionList.of(file1).and(file2);
PCollection<String> inputRead = allFiles.apply(Flatten.<String>pCollections());
inputRead.apply(ParDo.of(transformation)
.named(String.format("%s-CPT-transform", type))
.withSideInputs(views))
.apply(Write.to(getOutputTable(type))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withSchema(schema)
.named(String.format("%s-BQ-write", type)));

示例作业 ID: 2015-05-12_19_54_06-10158770219525037626

我究竟做错了什么?

最佳答案

代替提议的 hack,这真的很粗糙,我改为在 finishBundle() 中写一个空行。方法。这将为每个包写入 1 个空行,但我们可以忍受它,直到推出修复程序。设置“id”使得以后过滤这些行变得更加容易。

此外,这种解决方法/黑客更容易实现:

@Override
public void finishBundle(Context c) throws Exception {
TableRow workaroundRow = new TableRow();
workaroundRow.set("id", "workaround_row");
c.output(workaroundRow); //Workaround to http://goo.gl/CpBxEf
}

关于google-cloud-dataflow - 如何在 Dataflow 中正确使用 'flatten',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30204862/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com