gpt4 book ai didi

scala - 如何在java中使用数据流文本io动态目标

转载 作者:行者123 更新时间:2023-12-03 02:33:23 24 4
gpt4 key购买 nike

您好,我对动态文件目标 api 感到非常困惑,并且没有文档,所以我在这里。

情况是我有一个 PCollection,它包含属于不同分区的事件。我想将它们拆分并写入gcs中的不同文件夹。

这就是我所拥有的。

动态目标对象:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {

override def getDestination(element: Event): String = {
element.partition //this returns a string which is a gcs folder path
}

override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
println(destination)
val overallPrefix = s"$prefix/$destination/part-"
DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
FileSystems.matchNewResource(overallPrefix, false)),
null, ".jsonl", true)
}

override def formatRecord(record: Event): String = {
implicit val f = DefaultFormats
write(record.toDataLakeFormat())
}

override def getDefaultDestination: String = "default"
}

我相信这是正确的逻辑,我询问每个元素其目标分区是什么,然后将其传递到 getFileNamePolicy 并从那里构建文件名。为了格式化记录,我只需将其转换为 json。

问题是将其与 TextIO 集成,我尝试过

TextIO.
write()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))

但它要求源类型为字符串,从技术上讲这可以工作,但我必须多次反序列化。我在文档中找到了文本 io 动态目的地

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

所以让我们尝试一下

  TextIO
.writeCustomType[Event]()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))

这仍然无法编译,因为 writeCustomType 内部返回 TypedWrite<UserT, Void>这会产生链式 react ,要求我的动态目标对象的第二个类型参数为 Void。显然我要求它是一个字符串或至少是除 Void 之外的其他东西

我显然错过了一些东西

最佳答案

天哪,这太尴尬了。结果 writeCustomType().to(DynamicDestinations) 没有经过测试,我们也没有注意到它,但它的类型签名中有一个拼写错误。公关https://github.com/apache/beam/pull/4319正在审核中。不过,您仍然需要 2.3.0-SNAPSHOT 来获取它,在这种情况下,我仍然建议仅使用 FileIO.write()

关于scala - 如何在java中使用数据流文本io动态目标,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47926327/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com