gpt4 book ai didi

java - 如何从 PCollection 创建 PCollection 以执行波束 SQL 转换

转载 作者:行者123 更新时间:2023-11-30 05:34:26 26 4
gpt4 key购买 nike

我正在尝试实现一个数据管道,它连接来自 Kafka 主题的多个无限源。我能够连接到主题并获取数据为 PCollection<String>我需要将其转换为 PCollection<Row> 。我将逗号分隔的字符串拆分为数组,并使用架构将其转换为行。但是,如何实现/构建模式并动态地将值绑定(bind)到它?

即使我为架构构建创建一个单独的类,有没有办法将字符串数组直接绑定(bind)到架构?

下面是我当前的工作代码,它是静态的,每次构建管道时都需要重写,并且它也会根据字段的数量而延长。

final Schema sch1 =
Schema.builder().addStringField("name").addInt32Field("age").build();

PCollection<KafkaRecord<Long, String>> kafkaDataIn1 = pipeline
.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("testin1")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(
ImmutableMap.of("group.id", (Object)"test1")));

PCollection<Row> Input1 = kafkaDataIn1.apply(
ParDo.of(new DoFn<KafkaRecord<Long, String>, Row>() {
@ProcessElement
public void processElement(
ProcessContext processContext,
final OutputReceiver<Row> emitter) {

KafkaRecord<Long, String> record = processContext.element();
final String input = record.getKV().getValue();

final String[] parts = input.split(",");

emitter.output(
Row.withSchema(sch1)
.addValues(
parts[0],
Integer.parseInt(parts[1])).build());
}}))
.apply("window",
Window.<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes());

Input1.setRowSchema(sch1);

我的期望是以动态/可重用的方式实现与上述代码相同的效果。

最佳答案

架构是在 pcollection 上设置的,因此它不是动态的,如果您想延迟构建它,那么您需要使用支持它的格式/编码器。 Java 序列化或 json 是示例。

也就是说,为了受益于 sql 功能,您还可以使用带有查询字段和其他字段的静态模式,这样静态部分就可以执行 sql,并且不会丢失额外的数据。

罗曼

关于java - 如何从 PCollection<String> 创建 PCollection<Row> 以执行波束 SQL 转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56899078/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com