gpt4 book ai didi

java - 覆盖分区表Bigquery的一些分区

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:47:17 30 4
gpt4 key购买 nike

我目前正在尝试开发数据流管道以替换分区表的某些分区。我有一个自定义分区字段,它是一个日期。我的管道的输入是一个可能具有不同日期的文件。

我开发了一个管道:

    PipelineOptionsFactory.register(BigQueryOptions.class);
BigQueryOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> rows = p.apply("ReadLines", TextIO.read().from(options.getFileLocation()))
.apply("Convert To BQ Row", ParDo.of(new StringToRowConverter(options)));



ValueProvider<String> projectId = options.getProjectId();
ValueProvider<String> datasetId = options.getDatasetId();
ValueProvider<String> tableId = options.getTableId();
ValueProvider<String> partitionField = options.getPartitionField();
ValueProvider<String> columnNames = options.getColumnNames();
ValueProvider<String> types = options.getTypes();

rows.apply("Write to BQ", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withCustomGcsTempLocation(options.getGCSTempLocation())
.to(new DynamicDestinations<TableRow, String>() {

@Override
public String getDestination(ValueInSingleWindow<TableRow> element) {

TableRow date = element.getValue();

String partitionDestination = (String) date.get(partitionField.get());

SimpleDateFormat from = new SimpleDateFormat("yyyy-MM-dd");
SimpleDateFormat to = new SimpleDateFormat("yyyyMMdd");

try {

partitionDestination = to.format(from.parse(partitionDestination));
LOG.info("Table destination "+partitionDestination);
return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"$"+partitionDestination;

} catch(ParseException e){
e.printStackTrace();
return projectId.get()+":"+datasetId.get()+"."+tableId.get()+"_rowsWithErrors";
}
}

@Override
public TableDestination getTable(String destination) {

TimePartitioning timePartitioning = new TimePartitioning();
timePartitioning.setField(partitionField.get());
timePartitioning.setType("DAY");
timePartitioning.setRequirePartitionFilter(true);

TableDestination tableDestination = new TableDestination(destination, null, timePartitioning);

LOG.info(tableDestination.toString());

return tableDestination;

}

@Override
public TableSchema getSchema(String destination) {

return new TableSchema().setFields(buildTableSchemaFromOptions(columnNames, types));
}
})
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
);

p.run();
}

当我在本地触发管道时,它成功地替换了日期在输入文件中的分区。然而,当在 Google Cloud Dataflow 上部署并使用完全相同的参数运行模板时,它会截断所有数据,而我最后只有我想在表中上传的文件。

你知道为什么会有这样的差异吗?

谢谢!

最佳答案

您将 Bi​​gQueryIO.Write.CreateDisposition 指定为 CREATE_IF_NEEDED,这与 BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE 配对,因此即使该表存在,也可能会重新创建它。这就是您看到您的 table 被更换的原因。

有关详细信息,请参阅此文档 [1]。

[1] https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write.CreateDisposition#CREATE_IF_NEEDED

关于java - 覆盖分区表Bigquery的一些分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50820824/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com