gpt4 book ai didi

java - 使用 Google Dataflow 在批处理模式下使用 KafkaIO 进行消费

转载 作者:行者123 更新时间:2023-11-30 05:42:52 24 4
gpt4 key购买 nike

Google Dataflow 作业使用 Apache Beam 的 KafkaIO 库以及 AvroIO 和 Windowed Writes 将输出写入 Google Cloud Storage 存储桶中的“.avro”文件。但是,它默认使用流式处理作为生产数据的处理作业类型。

是否可以使用数据流中的 KafkaIO 使用批处理处理来使用来自 Kafka 主题的数据。此数据流作业不需要近实时处理(流式处理)。有没有办法将传入记录插入 BigQuery 表中,而无需流式插入成本,从而实现批处理类型。

运行频率较低的批处理可以发挥作用,从而减少内存、vCPU 和计算成本。

根据:https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/kafka/KafkaIO.html

KafkaIO 源返回 Kafka 记录的无界集合作为 PCollection>。

这是否意味着 Kafka 是无限源的,不能以批处理模式运行?

测试 .withMaxNumRecords(1000000) 条件以批处理模式运行作业。但是,要在实时传入数据中运行作业,我需要删除此条件。

我尝试使用窗口并将流模式选项标志设置为 false,但没有成功,如下面的代码所示。


// not streaming mode
options.setStreaming(false);

...
PCollection<String> collection = p.apply(KafkaIO.<String, String>read()
.withBootstrapServers("IPADDRESS:9092")
.withTopic(topic)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(props)
.withConsumerFactoryFn(new ConsumerFactory())
// .withMaxNumRecords(1000000)
.withoutMetadata()
).apply(Values.<String>create())
.apply(Window.into(FixedWindows.of(Duration.standardDays(1))));


...
//convert to Avro GenericRecord

.apply("AvroToGCS", AvroIO.writeGenericRecords(AVRO_SCHEMA)
.withWindowedWrites()
.withNumShards(1)
.to("gs://BUCKET/FOLDER/")
.withSuffix(".avro"));

该代码导致使用 4 个 vCPU 和 1 个工作线程在 9 分钟内处理 180 万条记录的作业类型。此后,我不得不停止工作(流失)以节省成本。

在数据流中对传入数据执行批处理,是否可以收集将其写入 avro 文件的批量记录,并继续执行此操作,直到偏移量 catch 最新

非常感谢任何示例或示例代码。

最佳答案

无限源无法在批处理模式下运行。这是设计使然,因为批处理管道期望读取有限数量的数据,并在处理完成后终止。

但是,您可以通过限制读取的记录数量将无界源转换为有界源,您已经这样做了。注意:不保证会读取哪些记录。

流管道应该始终处于运行状态,以便它们可以读取实时数据。批处理管道旨在读取存储数据的积压。

批处理管道不会很好地响应读取实时数据,当您启动管道然后终止时,它会读取那里的任何数据。

关于java - 使用 Google Dataflow 在批处理模式下使用 KafkaIO 进行消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55351031/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com