gpt4 book ai didi

google-cloud-dataflow - 创建 PCollectionView> 时如何解决重复值异常

转载 作者:行者123 更新时间:2023-12-01 13:16:26 26 4
gpt4 key购买 nike

我正在我的 Apache-Beam 管道中设置一个变化缓慢的查找 map 。它不断更新查找图。对于查找映射中的每个键,我以累积模式检索全局窗口中的最新值。
但它总是遇到异常:
org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: Duplicate values for mykey
这段代码有什么问题吗?

如果我使用 .discardingFiredPanes()相反,我会在最后一次发射中丢失信息。

pipeline
.apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()))
.accumulatingFiredPanes())
.apply(new ReadSlowChangingTable())
.apply(Latest.perKey())
.apply(View.asMap());

输入触发器示例:

t1 : KV<k1,v1> KV< k2,v2>
t2 : KV<k1,v1>
accumulatingFiredPanes => t2 的预期结果 => KV(k1,v1), KV(k2,v2) 但由于重复异常而失败
discardingFiredPanes => t2 时的预期结果 => KV(k1,v1) 成功

最佳答案

特别是关于 view.asMap 和评论中的累积 Pane 讨论:

如果您想使用 View.asMap 侧输入(例如,本地图元素的源本身是分布式的 - 通常是因为您正在从先前转换的输出创建侧输入),还有一些其他因素会影响需要考虑:View.asMap 本身就是一个聚合,它将继承触发并累积其输入。在此特定模式中,在此转换之前将管道设置为 accumulatingPanes 模式将导致重复键错误,即使在 Latest.perKey 转换之前使用了诸如 View.asMap 之类的转换。

鉴于读取更新了整个 map ,那么我认为使用 View.asSingleton 会是这个用例的更好方法。

围绕此模式的一些一般说明,希望对其他人也有用:

对于这种模式,我们可以使用 GenerateSequence 源转换定期发出一个值,例如每天一次。通过在每个元素上激活的数据驱动触发器将此值传递到全局窗口。在 DoFn 中,使用此过程作为触发器从有界源 Create 中提取数据,以便在下游转换中使用 SideInput。

需要注意的是,由于此模式使用全局窗口侧输入触发处理时间,因此与事件时间处理的元素的匹配将是不确定的。例如,如果我们有一个以事件时间为窗口的主管道,那么这些窗口将看到的 SideInput View 的版本将取决于在处理时间而不是事件时间中触发的最新触发器。

同样重要的是要注意,通常侧面输入应该适合内存。

Java(SDK 2.9.0):

在下面的示例中,侧输入以非常短的间隔更新,这样可以很容易地看到效果。期望侧输入更新缓慢,例如每隔几个小时或一天一次。

在下面的示例代码中,我们使用了我们在 Map 中创建的 DoFn,它成为 View.asSingleton,这是此模式的推荐方法。

下面的示例说明了该模式,请注意每次计数器更新都会重建 View.asSingleton

public static void main(String[] args) {

// Create pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);

// Using View.asSingleton, this pipeline uses a dummy external service as illustration.
// Run in debug mode to see the output
Pipeline p = Pipeline.create(options);

// Create slowly updating sideinput

PCollectionView<Map<String, String>> map = p
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))

.apply(Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())

.apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
@ProcessElement public void process(@Element Long input,
OutputReceiver<Map<String, String>> o) {
// Do any external reads needed here...
// We will make use of our dummy external service.
// Every time this triggers, the complete map will be replaced with that read from
// the service.
o.output(DummyExternalService.readDummyData());
}

})).apply(View.asSingleton());

// ---- Consume slowly updating sideinput

// GenerateSequence is only used here to generate dummy data for this illustration.
// You would use your real source for example PubSubIO, KafkaIO etc...
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {

@ProcessElement public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));

}
}).withSideInputs(map));

p.run();
}

public static class DummyExternalService {

public static Map<String, String> readDummyData() {

Map<String, String> map = new HashMap<>();
Instant now = Instant.now();

DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

return map;

}
}

关于google-cloud-dataflow - 创建 PCollectionView<Map<String,String>> 时如何解决重复值异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54422510/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com