gpt4 book ai didi

google-cloud-dataflow - 如何在 Apache Beam 中写入多个文件?

转载 作者:行者123 更新时间:2023-12-04 13:57:13 24 4
gpt4 key购买 nike

让我简化一下我的情况。我正在使用 Apache Beam 0.6.0。我的最终处理结果是 PCollection<KV<String, String>> .我想将值写入与其键对应的不同文件。

例如,假设结果包括

(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)

那我要写 value1 , value3value4key1.txt ,然后写 value4key2.txt .

就我而言:
  • key 集是在管道运行时确定的,而不是在构建管道时确定。
  • 键集可能很小,但是每个键对应的值的数量可能非常非常大。

  • 有任何想法吗?

    最佳答案

    前几天,我很方便地写了这个案例的样本。

    此示例是数据流 1.x 样式

    基本上,您按每个键分组,然后您可以使用连接到云存储的自定义转换来完成此操作。需要注意的是,每个文件的行列表不应很大(它必须适合单个实例的内存,但考虑到您可以运行高内存实例,该限制非常高)。

        ...
    PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
    .apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
    readyToWrite.apply(
    new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
    ...

    然后完成大部分工作的转换是:

    public class PTransformWriteToGCS
    extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {

    private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);

    private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();

    private final String bucketName;

    private final SerializableFunction<String, String> pathCreator;

    public PTransformWriteToGCS(final String bucketName,
    final SerializableFunction<String, String> pathCreator) {
    this.bucketName = bucketName;
    this.pathCreator = pathCreator;
    }

    @Override
    public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {

    return input
    .apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {

    @Override
    public void processElement(
    final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
    throws Exception {
    final String key = arg0.element().getKey();
    final List<String> values = arg0.element().getValue();
    final String toWrite = values.stream().collect(Collectors.joining("\n"));
    final String path = pathCreator.apply(key);
    BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
    .setContentType(MimeTypes.TEXT)
    .build();
    LOG.info("blob writing to: {}", blobInfo);
    Blob result = STORAGE.create(blobInfo,
    toWrite.getBytes(StandardCharsets.UTF_8));
    }
    }));
    }
    }

    关于google-cloud-dataflow - 如何在 Apache Beam 中写入多个文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43291058/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com