gpt4 book ai didi

google-cloud-storage - 使用 Cloud Dataflow 使用 DoFn 从 PubSub 写入 Google Cloud Storage

转载 作者:行者123 更新时间:2023-12-03 14:59:20 27 4
gpt4 key购买 nike

我正在尝试使用 Google Cloud Dataflow 将 Google PubSub 消息写入 Google Cloud Storage。我知道 TextIO/AvroIO 不支持流管道。但是,我在 [1] 中读到,可以从 ParDo/DoFn 在流管道中写入 GCS。在作者的评论中。我尽可能密切地关注他们的文章,从而构建了一个管道。

我的目标是这种行为:

  • 在与消息在 dataflow-requests/[isodate-time]/[paneIndex] 中发布的时间相对应的路径下以最多 100 个批次写入 GCS 中的对象(每个窗口 Pane 一个)的消息.

  • 我得到不同的结果:
  • 每个小时窗口中只有一个 Pane 。因此,我每小时只能在“存储桶”中获取一个文件(它实际上是 GCS 中的对象路径)。将 MAX_EVENTS_IN_FILE 减少到 10 没有任何区别,仍然只有一个 Pane /文件。
  • 每个 GCS 对象中只有一条消息被写出
  • 写入 GCS 时,管道偶尔会引发 CRC 错误。

  • 如何解决这些问题并获得我期望的行为?

    示例日志输出:
    21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
    21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
    21:30:07.773 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
    21:30:07.846 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
    21:30:07.847 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0

    这是我的代码:

    package com.example.dataflow;

    import com.google.cloud.dataflow.sdk.Pipeline;
    import com.google.cloud.dataflow.sdk.io.PubsubIO;
    import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
    import com.google.cloud.dataflow.sdk.options.PipelineOptions;
    import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
    import com.google.cloud.dataflow.sdk.transforms.DoFn;
    import com.google.cloud.dataflow.sdk.transforms.ParDo;
    import com.google.cloud.dataflow.sdk.transforms.windowing.*;
    import com.google.cloud.dataflow.sdk.values.PCollection;
    import com.google.gcloud.storage.BlobId;
    import com.google.gcloud.storage.BlobInfo;
    import com.google.gcloud.storage.Storage;
    import com.google.gcloud.storage.StorageOptions;
    import org.joda.time.Duration;
    import org.joda.time.format.ISODateTimeFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.IOException;

    public class PubSubGcsSSCCEPipepline {

    private static final Logger LOG = LoggerFactory.getLogger(PubSubGcsSSCCEPipepline.class);

    public static final String BUCKET_PATH = "dataflow-requests";

    public static final String BUCKET_NAME = "myBucketName";

    public static final Duration ONE_DAY = Duration.standardDays(1);
    public static final Duration ONE_HOUR = Duration.standardHours(1);
    public static final Duration TEN_SECONDS = Duration.standardSeconds(10);

    public static final int MAX_EVENTS_IN_FILE = 100;

    public static final String PUBSUB_SUBSCRIPTION = "projects/myProjectId/subscriptions/requests-dataflow";

    private static class DoGCSWrite extends DoFn<String, Void>
    implements DoFn.RequiresWindowAccess {

    public transient Storage storage;

    { init(); }

    public void init() { storage = StorageOptions.defaultInstance().service(); }

    private void readObject(java.io.ObjectInputStream in)
    throws IOException, ClassNotFoundException {
    init();
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
    String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
    String blobName = String.format("%s/%s/%s", BUCKET_PATH, isoDate, c.pane().getIndex());

    BlobId blobId = BlobId.of(BUCKET_NAME, blobName);
    LOG.info("writing pane {} to blob {}", c.pane().getIndex(), blobName);
    storage.create(BlobInfo.builder(blobId).contentType("text/plain").build(), c.element().getBytes());
    LOG.info("sucessfully write pane {} to blob {}", c.pane().getIndex(), blobName);
    }
    }

    public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    options.as(DataflowPipelineOptions.class).setStreaming(true);
    Pipeline p = Pipeline.create(options);

    PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
    .subscription(PUBSUB_SUBSCRIPTION);

    PCollection<String> streamData = p.apply(readFromPubsub);

    PCollection<String> windows = streamData.apply(Window.<String>into(FixedWindows.of(ONE_HOUR))
    .withAllowedLateness(ONE_DAY)
    .triggering(AfterWatermark.pastEndOfWindow()
    .withEarlyFirings(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE))
    .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE),
    AfterProcessingTime.pastFirstElementInPane()
    .plusDelayOf(TEN_SECONDS))))
    .discardingFiredPanes());

    windows.apply(ParDo.of(new DoGCSWrite()));

    p.run();
    }


    }

    [1] https://labs.spotify.com/2016/03/10/spotifys-event-delivery-the-road-to-the-cloud-part-iii/

    感谢 Sam McVeety 提供的解决方案。这是任何人阅读的更正代码:

    package com.example.dataflow;

    import com.google.cloud.dataflow.sdk.Pipeline;
    import com.google.cloud.dataflow.sdk.io.PubsubIO;
    import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
    import com.google.cloud.dataflow.sdk.options.PipelineOptions;
    import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
    import com.google.cloud.dataflow.sdk.transforms.*;
    import com.google.cloud.dataflow.sdk.transforms.windowing.*;
    import com.google.cloud.dataflow.sdk.values.KV;
    import com.google.cloud.dataflow.sdk.values.PCollection;
    import com.google.gcloud.WriteChannel;
    import com.google.gcloud.storage.BlobId;
    import com.google.gcloud.storage.BlobInfo;
    import com.google.gcloud.storage.Storage;
    import com.google.gcloud.storage.StorageOptions;
    import org.joda.time.Duration;
    import org.joda.time.format.ISODateTimeFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.util.Iterator;

    public class PubSubGcsSSCCEPipepline {

    private static final Logger LOG = LoggerFactory.getLogger(PubSubGcsSSCCEPipepline.class);

    public static final String BUCKET_PATH = "dataflow-requests";

    public static final String BUCKET_NAME = "myBucketName";

    public static final Duration ONE_DAY = Duration.standardDays(1);
    public static final Duration ONE_HOUR = Duration.standardHours(1);
    public static final Duration TEN_SECONDS = Duration.standardSeconds(10);

    public static final int MAX_EVENTS_IN_FILE = 100;

    public static final String PUBSUB_SUBSCRIPTION = "projects/myProjectId/subscriptions/requests-dataflow";

    private static class DoGCSWrite extends DoFn<Iterable<String>, Void>
    implements DoFn.RequiresWindowAccess {

    public transient Storage storage;

    { init(); }

    public void init() { storage = StorageOptions.defaultInstance().service(); }

    private void readObject(java.io.ObjectInputStream in)
    throws IOException, ClassNotFoundException {
    init();
    }

    @Override
    public void processElement(ProcessContext c) throws Exception {
    String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
    long paneIndex = c.pane().getIndex();
    String blobName = String.format("%s/%s/%s", BUCKET_PATH, isoDate, paneIndex);

    BlobId blobId = BlobId.of(BUCKET_NAME, blobName);

    LOG.info("writing pane {} to blob {}", paneIndex, blobName);
    WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build());
    LOG.info("blob stream opened for pane {} to blob {} ", paneIndex, blobName);
    int i=0;
    for (Iterator<String> it = c.element().iterator(); it.hasNext();) {
    i++;
    writer.write(ByteBuffer.wrap(it.next().getBytes()));
    LOG.info("wrote {} elements to blob {}", i, blobName);
    }
    writer.close();
    LOG.info("sucessfully write pane {} to blob {}", paneIndex, blobName);
    }
    }

    public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    options.as(DataflowPipelineOptions.class).setStreaming(true);
    Pipeline p = Pipeline.create(options);

    PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
    .subscription(PUBSUB_SUBSCRIPTION);

    PCollection<String> streamData = p.apply(readFromPubsub);
    PCollection<KV<String, String>> keyedStream =
    streamData.apply(WithKeys.of(new SerializableFunction<String, String>() {
    public String apply(String s) { return "constant"; } }));

    PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream
    .apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_HOUR))
    .withAllowedLateness(ONE_DAY)
    .triggering(AfterWatermark.pastEndOfWindow()
    .withEarlyFirings(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE))
    .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE),
    AfterProcessingTime.pastFirstElementInPane()
    .plusDelayOf(TEN_SECONDS))))
    .discardingFiredPanes())
    .apply(GroupByKey.create());


    PCollection<Iterable<String>> windows = keyedWindows
    .apply(Values.<Iterable<String>>create());


    windows.apply(ParDo.of(new DoGCSWrite()));

    p.run();
    }

    }

    最佳答案

    这里有一个问题,那就是你需要一个 GroupByKey为了适本地聚合 Pane 。 Spotify 示例将此引用为“ Pane 的 Material 化是在“聚合事件”转换中完成的,这只不过是 GroupByKey 转换”,但这是一个微妙的点。您需要提供一个 key 才能执行此操作,在您的情况下,似乎可以使用常量值。

      PCollection<String> streamData = p.apply(readFromPubsub);
    PCollection<KV<String, String>> keyedStream =
    streamData.apply(WithKeys.of(new SerializableFunction<String, String>() {
    public Integer apply(String s) { return "constant"; } }));

    此时,您可以应用您的窗口函数,然后是最后的 GroupByKey获得所需的行为:
      PCollection<String, Iterable<String>> keyedWindows = keyedStream.apply(...)
    .apply(GroupByKey.create());
    PCollection<Iterable<String>> windows = keyedWindows
    .apply(Values.<Iterable<String>>create());

    现在 processElement 中的元素将是 Iterable<String> ,大小为 100 或更多。

    我们已提交 https://issues.apache.org/jira/browse/BEAM-184使这种行为更清晰。

    关于google-cloud-storage - 使用 Cloud Dataflow 使用 DoFn 从 PubSub 写入 Google Cloud Storage,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36509116/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com