gpt4 book ai didi

google-cloud-storage - Cloud Pub/Sub 到 GCS,按元素写入(数据流管道)

转载 作者:行者123 更新时间:2023-12-01 17:19:11 24 4
gpt4 key购买 nike

每次从 Pubsub 收到消息时,如何写入 GCS,它会进行窗口写入,但不会按元素写入。非常感谢有关此事的任何提示。

示例链接 ( https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToText.java )

运行此示例代码后,它会写入发送到 GCS 的发布-订阅消息。但是,当持续时间设置为 1 分钟时,它会保存所有消息,然后在一分钟后写入 1 个文件,但我希望它将每条消息写入不同的文件。

最佳答案

如果您需要每条消息一个文件,一个选择是创建简单的转换,如下所示:

package com.myapp.dataflow.transform;

import org.apache.beam.sdk.transforms.DoFn;
import com.google.cloud.storage.*;
import static java.nio.charset.StandardCharsets.UTF_8;

public class StringToGcsFile extends DoFn<String, Blob> {
private Storage storage;
private String bucketName = "my-bucket";

@Setup
public void setup() {
storage = StorageOptions.getDefaultInstance().getService();
}

@ProcessElement
public void processElement(ProcessContext c) {
// consider some strategy for object names, UUID or something
String blobName = "my_blob_name";

// Upload a blob to the bucket
BlobId blobId = BlobId.of(bucketName, blobName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
Blob blob = storage.create(blobInfo, c.element().getBytes(UTF_8));

c.output(blob);
}
}

Maven 依赖:

<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.35.0</version>
</dependency>

关于google-cloud-storage - Cloud Pub/Sub 到 GCS,按元素写入(数据流管道),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50991930/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com