gpt4 book ai didi

java - Beam java SDK 2.10.0 与 Kafka 源和数据流运行程序 : windowed Count. perElement 永远不会触发数据

转载 作者:太空宇宙 更新时间:2023-11-04 09:50:05 24 4
gpt4 key购买 nike

我在 Google DataFlow 上运行 Beam SDK 2.10.0 作业时遇到问题

流程很简单:我使用 Kafka 作为源,然后应用固定窗口,然后按键对元素进行计数。但看起来数据永远不会离开计数阶段,直到工作耗尽为止。 Count.PerElement/Combine.perKey(Count)/Combine.GroupedValues.out0的输出集合始终为零。仅在耗尽数据流作业后才发出元素。

这是代码:

public KafkaProcessingJob(BaseOptions options) {

PCollection<GenericRecord> genericRecordPCollection = Pipeline.create(options)
.apply("Read binary Kafka messages", KafkaIO.<String, byte[]>read()
.withBootstrapServers(options.getBootstrapServers())
.updateConsumerProperties(configureConsumerProperties())
.withCreateTime(Duration.standardMinutes(1L))
.withTopics(inputTopics)
.withReadCommitted()
.commitOffsetsInFinalize()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class))

.apply("Map binary message to Avro GenericRecord", new DecodeBinaryKafkaMessage());

.apply("Apply windowing to records", Window.into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5)))

.apply("Write aggregated data to BigQuery", MapElements.into(TypeDescriptors.strings()).via(rec -> getKey(rec)))
.apply(Count.<String>perElement())
.apply(
new WriteWindowedToBigQuery<>(
project,
dataset,
table,
configureWindowedTableWrite()));
}

private Map<String, Object> configureConsumerProperties() {
Map<String, Object> configUpdates = Maps.newHashMap();
configUpdates.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

return configUpdates;
}

private static String getKey(GenericRecord record) {
//extract key
}

看起来流量永远不会离开.apply(Count.<String>perElement())的舞台

有人可以帮忙吗?

最佳答案

我已经找到原因了。

它与此处使用的 TimestampPolicy 有关(.withCreateTime(Duration.standardMinutes(1L)))。

由于我们的 Kafka 主题中存在空分区,主题水印从未使用默认的 TimestampPolicy 进行推进。我需要实现自定义策略来解决该问题。

关于java - Beam java SDK 2.10.0 与 Kafka 源和数据流运行程序 : windowed Count. perElement 永远不会触发数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54889350/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com