gpt4 book ai didi

java - Apache Beam : Writing values of key, 根据 key 对文件进行值对

转载 作者:行者123 更新时间:2023-12-01 21:58:07 26 4
gpt4 key购买 nike

我想在 Apache Beam(使用 Java)中使用 FileIOwriteDynamic() 将键、值对中的值通过键写入 GCS 中的文本文件。

到目前为止,我正在从 Big Query 读取数据,将其转换为键、值对,然后尝试使用 FileIO 和 writeDynamic() 将值写入每个键一个文件中.

PCollection<TableRow> inputRows = p.apply(BigQueryIO.readTableRows()
.from(tableSpec)
.withMethod(Method.DIRECT_READ)
.withSelectedFields(Lists.newArrayList("id", "string1", "string2", "string3", "int1")));

inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to("gs://bucket/output")
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

我收到错误:

The method apply
(PTransform<? super PCollection<KV<Integer,String>>,OutputT>)
in the type PCollection<KV<Integer,String>>
is not applicable for the arguments
(FileIO.Write<String,KV<String,String>>)

最佳答案

存在类型不匹配。请注意 TableRow元素被解析为 KV<Integer, String>MapElements (即 key 是 Integer )。然后,写入步骤需要 String键如 .apply(FileIO.<String, KV<String, String>>writeDynamic() :

inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
...

避免在使用 .by(KV::getKey) 时再次输入 key 我建议将其转换为 String之前:

inputRows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)

作为示例,我使用公共(public)表 bigquery-public-data:london_bicycles.cycle_stations 对此进行了测试我将每个自行车站写入不同的文件:

$ cat output/file-746-00000-of-00004.txt 
Lots Road, West Chelsea

$ bq query --use_legacy_sql=false "SELECT name FROM \`bigquery-public-data.london_bicycles.cycle_stations\` WHERE id = 746"
Waiting on bqjob_<ID> ... (0s) Current status: DONE
+-------------------------+
| name |
+-------------------------+
| Lots Road, West Chelsea |
+-------------------------+

完整代码:

package com.dataflow.samples;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;


public abstract class DynamicGCSWrites {

public interface Options extends PipelineOptions {
@Validation.Required
@Description("Output Path i.e. gs://BUCKET/path/to/output/folder")
String getOutput();
void setOutput(String s);
}

public static void main(String[] args) {

DynamicGCSWrites.Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicGCSWrites.Options.class);

Pipeline p = Pipeline.create(options);

String output = options.getOutput();

PCollection<TableRow> inputRows = p
.apply(BigQueryIO.readTableRows()
.from("bigquery-public-data:london_bicycles.cycle_stations")
.withMethod(Method.DIRECT_READ)
.withSelectedFields(Lists.newArrayList("id", "name")));

inputRows
.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
.apply(FileIO.<String, KV<String, String>>writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to(output)
.withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

p.run().waitUntilFinish();
}
}

关于java - Apache Beam : Writing values of key, 根据 key 对文件进行值对,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58725111/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com