gpt4 book ai didi

google-cloud-dataflow - 如何根据处理的元素数量动态触发窗口?

转载 作者:行者123 更新时间:2023-12-02 08:03:34 25 4
gpt4 key购买 nike

我有一个在 Google Cloud Dataflow 上运行的 Apache Beam 管道。这是一个流管道,它从 Google Cloud PubSub 接收输入消息,这些消息基本上是要处理的元素的 JSON 数组。

粗略地说,管道有以下步骤:

  1. 将消息反序列化为 PCollecttion<List<T>> .
  2. 将数组拆分(或分解)为 PCollection<T> .
  3. 处理步骤很少:某些元素将在其他元素之前完成,并且某些元素会被缓存,因此它们只是跳到末尾,根本不需要太多处理。
  4. 展平所有输出并应用 GroupByKey (这是问题步骤):它转换 PCollection回到Pcollection<List<T>>但它不会等待所有元素。
  5. 序列化以发布 PubSub 消息。

我无法获取最后一个GroupByKey将收到的所有元素分组在一起。发布的消息不包含必须处理的元素,并且比跳到末尾的元素花费的时间更长。

我认为如果我可以编写自定义数据驱动触发器,这将很容易解决。或者即使我可以动态设置触发器 AfterPane.elementCountAtLeast()来自定制WindowFn

我似乎无法制作自定义触发器。但是是否可以以某种方式动态地为每个窗口设置触发器?

--

这是我正在开发的管道的简化版本。

我简化了对象数组 T 的输入转换成一个简单的整数数组。我已经模拟了这些整数的键(或 ID)。通常它们是对象的一部分。

我还将缓慢的处理步骤(实际上是几个步骤)简化为具有人为延迟的单个步骤。

(完整示例要点 https://gist.github.com/naringas/bfc25bcf8e7aca69f74de719d75525f2 )

PCollection<String> queue = pipeline
.apply("ReadQueue", PubsubIO.readStrings().fromTopic(topic))
.apply(Window
.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.standardSeconds(3))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(2)))
.discardingFiredPanes());

TupleTag<List<KV<Integer, Integer>>> tagDeserialized = new TupleTag<List<KV<Integer, Integer>>>() {};
TupleTag<Integer> tagDeserializeError = new TupleTag<Integer>() {};
PCollectionTuple imagesInputTuple = queue
.apply("DeserializeJSON", ParDo.of(new DeserializingFn()).withOutputTags(tagDeserialized, TupleTagList.of(tagDeserializeError)));

/*
This is where I think that I must adjust the custom window strategy, set the customized dynamic-trigger
*/
PCollection<KV<Integer, Integer>> images = imagesInputTuple.get(tagDeserialized)
/* I have tried many things
.apply(Window.<List<KV<Integer, Integer>>>into(new GlobalWindows()))
*/
.apply("Flatten into timestamp", ParDo.of(new DoFn<List<KV<Integer, Integer>>, KV<Integer, Integer>>() {
// Flatten and output into same ts
// like Flatten.Iterables() but I set the output window
@ProcessElement
public void processElement(@Element List<KV<Integer, Integer>> input, OutputReceiver<KV<Integer, Integer>> out, @Timestamp Instant ts, BoundedWindow w, PaneInfo p) {
Instant timestamp = w.maxTimestamp();
for (KV<Integer, Integer> el : input) {
out.outputWithTimestamp(el, timestamp);
}
}
}))
.apply(Window.<KV<Integer, Integer>>into(new GlobalWindows()));

TupleTag<KV<Integer, Integer>> tagProcess = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagSkip = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple preproc = images
.apply("PreProcessingStep", ParDo.of(new SkipOrNotDoFn()).withOutputTags(tagProcess, TupleTagList.of(tagSkip)));

TupleTag<KV<Integer, Integer>> tagProcessed = new TupleTag<KV<Integer, Integer>>() {};
TupleTag<KV<Integer, Integer>> tagError = new TupleTag<KV<Integer, Integer>>() {};
PCollectionTuple processed = preproc.get(tagProcess)
.apply("ProcessingStep", ParDo.of(new DummyDelasyDoFn).withOutputTags(tagProcessed, TupleTagList.of(tagError)));

/* Here, at the "end"
the elements get grouped back
first: join into a PcollectionList and flatten it
second: GroupByKey which should but doesn't way for all elements
lastly: serilize and publish (in this case just print out)
*/
PCollection end = PCollectionList.of(preproc.get(tagSkip)).and(processed.get(tagProcessed))
.apply("FlattenUpsert", Flatten.pCollections())
//
.apply("GroupByParentId", GroupByKey.create())
.apply("GroupedValues", Values.create())
.apply("PublishSerialize", ParDo.of(
new DoFn<Object, String>() {
@ProcessElement
public void processElement(ProcessContext pc) {
String output = GSON.toJson(pc.element());
LOG.info("DONE: {}", output);
pc.output(output);
}
}));
// "send the string to pubsub" goes here

最佳答案

我玩了一下有状态管道。当您想使用数据驱动触发器或 AfterPane.elementCountAtLeast()我假设您知道符合消息的元素数量(或者至少每个键不会改变),所以我定义了 NUM_ELEMENTS = 10就我而言。

我的方法的主要思想是跟踪到目前为止我所看到的特定键的元素数量。请注意,我必须合并 PreProcessingStepProcessingStep合并为一个以进行准确计数。我知道这只是一个简化的示例,所以我不知道如何将其转化为实际场景。

在有状态的 ParDo 中,我定义了两个状态变量,一个 BagState看到所有整数和 ValueState计算错误数量:

// A state bag holding all elements seen for that key
@StateId("elements_seen")
private final StateSpec<BagState<Integer>> elementSpec =
StateSpecs.bag();

// A state cell holding error count
@StateId("errors")
private final StateSpec<ValueState<Integer>> errorSpec =
StateSpecs.value(VarIntCoder.of());

然后我们像往常一样处理每个元素,但我们不会输出任何内容,除非它是一个错误。在这种情况下,我们在将元素发送到tagError之前更新错误计数器。侧面输出:

errors.write(firstNonNull(errors.read(), 0) + 1);
is_error = true;
output.get(tagError).output(input);

我们更新计数,对于成功处理或跳过的元素(即 !is_error ),将新观察到的元素写入 BagState :

int count = firstNonNull(Iterables.size(state.read()), 0) + firstNonNull(errors.read(), 0);

if (!is_error) {
state.add(input.getValue());
count += 1;
}

然后,如果成功处理的元素和错误的总和等于 NUM_ELEMENTS (我们在这里模拟数据驱动的触发器),我们刷新 BagState 中的所有项目。 :

if (count >= NUM_ELEMENTS) {
Iterable<Integer> all_elements = state.read();
Integer key = input.getKey();

for (Integer value : all_elements) {
output.get(tagProcessed).output(KV.of(key, value));
}
}

请注意,这里我们已经可以对值进行分组并仅发出一个 KV<Integer, Iterable<Integer>>反而。我刚刚做了一个for而是循环以避免更改下游的其他步骤。

这样,我发布了一条消息,例如:

gcloud pubsub topics publish streamdemo --message "[1,2,3,4,5,6,7,8,9,10]"

我之前得到的地方:

INFO: DONE: [4,8]

现在我得到:

INFO: DONE: [1,2,3,4,5,6,8,9,10]

元素7不存在,因为它是模拟错误的。

使用 DirectRunner 进行测试和2.16.0 SDK。完整代码here .

请告诉我这是否适合您的用例,请记住我只做了一些小测试。

关于google-cloud-dataflow - 如何根据处理的元素数量动态触发窗口?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59061351/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com