gpt4 book ai didi

java - 为什么可迭代操作会在 Apache Beam 函数中引发错误?

转载 作者:行者123 更新时间:2023-12-02 10:45:44 24 4
gpt4 key购买 nike

我在 Iterable Java 集合上调用此函数,该集合由 GroupByKey 函数生成:

static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
String COMPLETE_EVENT_NAME = "COMPLETE";

@ProcessElement
public void processElement(ProcessContext c) {
Iterable<Order> orders = c.element().getValue();
boolean complete = false;

do {
try {
Order order = orders.iterator().next();

if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
complete = true;
order.setComplete(complete);
c.output(order);
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
} while (complete == false && orders.iterator().hasNext());
}
}

该函数迭代Orders 列表并输出与指定eventName 属性匹配的第一个实例。如果找到 Order 或已迭代整个集合,则循环结束。

随机 Order 实例在上游生成,并以 2/秒的速率发布到 Pub/Sub 实例,由 消耗调用此函数的 DataFlow 实例。大约。操作 15 分钟后,开始出现警告:

Processing stuck in step Find Order for at least 15m00s without outputting or completing

该警告是由于 iterator().hasNext()iterator().next() 中偶尔出现故障而发出的。最终的结果是整个管道停滞。关联的管道阶段从不发出输出。

用标准 for 循环替换循环可以解决问题。然而,这样做意味着迭代整个集合;我希望在找到适当的元素时结束循环,因此使用 do-while 循环。

我很想知道为什么迭代器操作会导致管道停止。 FAIA Iterable 集合是不可变的,并且不会被其他进程修改。

我在 Windows 上运行 Java 8Apache Beam 2.6

最佳答案

每次调用 orders.iterator() 时,您都会从第一个订单开始创建一个新的迭代器。这意味着您在循环中一遍又一遍地处理相同的订单。如果有多个订单,则对 hasNext() 的调用将始终为 true。因此,如果您有多个订单,或者您的第一个订单未设置complete,则循环将永远运行,这就是您遇到超时的原因。

相反,您应该调用一次 iterator() 并存储迭代器而不是 iterable,并使用它来循环:

static class FindCompleteOrder extends DoFn<KV<String, Iterable<Order>>, Order> {
String COMPLETE_EVENT_NAME = "COMPLETE";

@ProcessElement
public void processElement(ProcessContext c) {
Iterator<Order> orders = c.element().getValue().iterator();
boolean complete = false;

do {
try {
Order order = orders.next();

if (order.getEventName().equals(COMPLETE_EVENT_NAME)) {
complete = true;
order.setComplete(complete);
c.output(order);
}
} catch (Exception e) {
LOG.error(e.getMessage());
}
} while (complete == false && orders.hasNext());
}
}

关于java - 为什么可迭代操作会在 Apache Beam 函数中引发错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52599122/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com