gpt4 book ai didi

google-cloud-dataproc - Blob 的 Google Cloud Storage 原子创建

转载 作者:行者123 更新时间:2023-12-05 05:41:08 25 4
gpt4 key购买 nike

我正在使用 haddop-connectors将 BLOB 写入 Google Cloud Storage 的项目。

我想确保在并发上下文中写入的具有特定目标名称 的 BLOB 要么以 FULL 写入,要么根本不显示,以防发生异常写入时发生。

在下面的代码中,如果发生 I/O 异常,写入的 BLOB 将出现在 GCS 上,因为流将在 finally 中关闭:

        val stream = fs.create(path, overwrite)
try {
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
} finally {
stream.close()
}

另一种可能性是不关闭流并让它“泄漏”,这样就不会创建 BLOB。然而,这并不是一个真正有效的选项。

        val stream = fs.create(path, overwrite)
actions.map(_ + "\n").map(_.getBytes(UTF_8)).foreach(stream.write)
stream.close()

任何人都可以与我分享如何使用 hadoop-connectors 将 BLOB 写入 GCS 的方法吗?或 cloud storage client 以原子方式?

最佳答案

我在 hadoop-connectors 中使用了反射从 GoogleHadoopFileSystem 实例中检索 com.google.api.services.storage.Storage 实例

    GoogleCloudStorage googleCloudStorage =  ghfs.getGcsFs().getGcs();
Field gcsField = googleCloudStorage.getClass().getDeclaredField("gcs");
gcsField.setAccessible(true);
Storage gcs = (Storage) gcsField.get(googleCloudStorage);

为了能够根据内存中数据对应的输入流进行调用。

private static StorageObject createBlob(URI blobPath, byte[] content, GoogleHadoopFileSystem ghfs, Storage gcs)
throws IOException
{
CreateFileOptions createFileOptions = new CreateFileOptions(false);
CreateObjectOptions createObjectOptions = objectOptionsFromFileOptions(createFileOptions);
PathCodec pathCodec = ghfs.getGcsFs().getOptions().getPathCodec();
StorageResourceId storageResourceId = pathCodec.validatePathAndGetId(blobPath, false);

StorageObject object =
new StorageObject()
.setContentEncoding(createObjectOptions.getContentEncoding())
.setMetadata(encodeMetadata(createObjectOptions.getMetadata()))
.setName(storageResourceId.getObjectName());

InputStream inputStream = new ByteArrayInputStream(content, 0, content.length);
Storage.Objects.Insert insert = gcs.objects().insert(
storageResourceId.getBucketName(),
object,
new InputStreamContent(createObjectOptions.getContentType(), inputStream));
// The operation succeeds only if there are no live versions of the blob.
insert.setIfGenerationMatch(0L);
insert.getMediaHttpUploader().setDirectUploadEnabled(true);
insert.setName(storageResourceId.getObjectName());
return insert.execute();
}

/**
* Helper for converting from a Map<String, byte[]> metadata map that may be in a
* StorageObject into a Map<String, String> suitable for placement inside a
* GoogleCloudStorageItemInfo.
*/
@VisibleForTesting
static Map<String, String> encodeMetadata(Map<String, byte[]> metadata) {
return Maps.transformValues(metadata, QuickstartParallelApiWriteExample::encodeMetadataValues);
}

// A function to encode metadata map values
private static String encodeMetadataValues(byte[] bytes) {
return bytes == null ? Data.NULL_STRING : BaseEncoding.base64().encode(bytes);
}

请注意,在上面的示例中,即使有多个调用者试图并行创建同名的 blob,只有一个调用者会成功创建 blob。其他调用者将收到 412 Precondition Failed

关于google-cloud-dataproc - Blob 的 Google Cloud Storage 原子创建,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72317618/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com