gpt4 book ai didi

google-cloud-dataflow - 如何尽快限制 Apache Beam 中的 PCollection?

转载 作者:行者123 更新时间:2023-12-04 14:56:28 24 4
gpt4 key购买 nike

我在 Google Cloud DataFlow(带有 Scio SDK)上使用 Apache Beam 2.28.0。我有一个很大的输入 PCollection(有界),我想将它限制/采样到固定数量的元素,但我想尽快开始下游处理。

目前,当我的输入 PCollection 有例如20M 个元素,我想使用 https://beam.apache.org/releases/javadoc/2.28.0/org/apache/beam/sdk/transforms/Sample.html#any-long- 将其限制为 1M

input.apply(Sample.<String>any(1000000))

它会一直等到读取完所有 20M 元素,这需要很长时间。

如何有效地将元素数量限制为固定大小,并在达到限制后立即开始下游处理,丢弃其余的输入处理?

最佳答案

好的,所以我最初的解决方案是像这样使用 Stateful DoFn(我正在使用问题中提到的 Scio 的 Scala SDK):

import java.lang.{Long => JLong}

class MyLimitFn[T](limit: Long) extends DoFn[KV[String, T], KV[String, T]] {
@StateId("count") private val count = StateSpecs.value[JLong]()

@ProcessElement
def processElement(context: DoFn[KV[String, T], KV[String, T]]#ProcessContext, @StateId("count") count: ValueState[JLong]): Unit = {
val current = count.read()
if(current < limit) {
count.write(current + 1L)
context.output(context.element())
}
}
}

此解决方案的缺点是我需要在使用之前向所有元素综合添加相同的键(例如空字符串)。到目前为止,它比 Sample.<>any() 快得多.

我仍然期待看到更好/更高效的解决方案。

关于google-cloud-dataflow - 如何尽快限制 Apache Beam 中的 PCollection?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/67885943/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com