gpt4 book ai didi

java - 谷歌数据流: PCollection to PCollection for BigQuery insertion

转载 作者:行者123 更新时间:2023-12-01 16:53:45 26 4
gpt4 key购买 nike

我对 Google Cloud Platform 还很陌生,这是我第一次在研究生类(class)的项目中尝试使用 Google Dataflow。我想要做的是编写一个自动加载作业,从我的云存储上的某个存储桶加载文件,并将其中的数据插入 BigQuery 表中。

我得到的数据为 PCollection<String>类型,但为了在 BigQuery 中插入,我显然需要将其转换为 PCollection<TableRow>类型。到目前为止我还没有找到可靠的答案来做到这一点。

这是我的代码:

public static void main(String[] args) {
//Defining the schema of the BigQuery table
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("Datetime").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("Consumption").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("MeterID").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);

//Creating the pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline p = Pipeline.create(options);

//Getting the data from cloud storage
PCollection<String> lines = p.apply(TextIO.Read.named("ReadCSVFromCloudStorage").from("gs://mybucket/myfolder/certainCSVfile.csv"));

//Probably need to do some transform here ...

//Inserting data into BigQuery
lines.apply(BigQueryIO.Write
.named("WriteToBigQuery")
.to("projectID:datasetID:tableID")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
}

我可能只是忘记了一些基本的东西,所以我希望你们能帮助我......

最佳答案

BigQueryIO.Write运行于 PCollection<TableRow> ,如 Writing to BigQuery 中所述。您需要应用转换来转换 PCollection<TableRow>进入PCollection<String> 。例如,请查看 StringToRowConverter :

  static class StringToRowConverter extends DoFn<String, TableRow> {
/**
* In this example, put the whole string into single BigQuery field.
*/
@Override
public void processElement(ProcessContext c) {
c.output(new TableRow().set("string_field", c.element()));
}
...
}

关于java - 谷歌数据流: PCollection<String> to PCollection<TableRow> for BigQuery insertion,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35649497/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com