gpt4 book ai didi

java - 如何将 CSV 文件导入没有任何列名或架构的 BigQuery 表?

转载 作者:搜寻专家 更新时间:2023-10-30 21:10:59 24 4
gpt4 key购买 nike

我目前正在编写一个 Java 实用程序,用于将几个 CSV 文件从 GCS 导入 BigQuery。我可以通过 bq load 轻松实现这一点,但我想使用 Dataflow 作业来完成。因此,我正在使用数据流的管道和 ParDo 转换器(返回 TableRow 以将其应用于 BigQueryIO),并且我已经为转换创建了 StringToRowConverter()。实际问题从这里开始——我被迫为目标表指定模式,尽管我不想创建一个不存在的新表——只是试图加载数据。所以我不想手动设置 TableRow 的列名,因为我有大约 600 列。

public class StringToRowConverter extends DoFn<String, TableRow> {

private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class);

public void processElement(ProcessContext c) {
TableRow row = new TableRow();
row.set("DO NOT KNOW THE COLUMN NAME", c.element());
c.output(row);
}
}

此外,假设该表已存在于 BigQuery 数据集中,我不需要创建它,而且 CSV 文件包含按正确顺序排列的列。

如果这种情况没有解决方法,并且数据加载需要列名,那么我可以将它放在 CSV 文件的第一行。

我们将不胜感激。

最佳答案

为避免创建表,您应该在管道配置期间使用 BigQueryIO.Write 的 BigQueryIO.Write.CreateDisposition.CREATE_NEVER。来源:https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write

您不需要预先知道 BigQuery 表架构,您可以动态地发现它。例如,您可以使用 BigQuery API ( https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get ) 查询表架构并将其作为 StringToRowConverter 类的参数传递。假设第一行是标题,另一种选择是跳过第一行并使用它正确映射文件的其余部分。

下面的代码实现了第二种方法,并将输出配置为附加到现有的 BigQuery 表。

public class DFJob {

public static class StringToRowConverter extends DoFn<String, TableRow> {

private String[] columnNames;

private boolean isFirstRow = true;

public void processElement(ProcessContext c) {
TableRow row = new TableRow();

String[] parts = c.element().split(",");

if (isFirstRow) {
columnNames = Arrays.copyOf(parts, parts.length);
isFirstRow = false;
} else {
for (int i = 0; i < parts.length; i++) {
row.set(columnNames[i], parts[i]);
}
c.output(row);
}
}
}

public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);

Pipeline p = Pipeline.create(options);

p.apply(TextIO.Read.from("gs://dataflow-samples/myfile.csv"))
.apply(ParDo.of(new StringToRowConverter()))
.apply(BigQueryIO.Write.to("myTable")
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

PipelineResult result = p.run();
}
}

关于java - 如何将 CSV 文件导入没有任何列名或架构的 BigQuery 表?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45747360/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com