gpt4 book ai didi

java - 使用 Apache Beam 反序列化 Kafka AVRO 消息

转载 作者:行者123 更新时间:2023-12-05 05:12:17 25 4
gpt4 key购买 nike

主要目标是聚合两个 Kafka 主题,一个是压缩的慢速移动数据,另一个是每秒接收的快速移动数据。

我已经能够在简单的场景中使用消息,例如 KV (Long,String),使用类似的东西:

PCollection<KV<Long,String>> input = p.apply(KafkaIO.<Long, 
String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)

PCollection<String> output = input.apply(Values.<String>create());

但是当你需要从 AVRO 反序列化时,这似乎不是方法。我有一个需要消费的 KV(STRING, AVRO)。

我尝试从 AVRO 模式生成 Java 类,然后将它们包含在“应用”中,例如:

PCollection<MyClass> output = input.apply(Values.<MyClass>create());

但这似乎不是正确的做法。

是否有任何人可以向我指出任何文档/示例,以便我了解您将如何使用 Kafka AVRO 和 Beam?

我已经更新了我的代码:

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

public class Main {

public static void main(String[] args) {

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<KV<Long, Myclass>> input = p.apply(KafkaIO.<Long, String>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializerAndCoder(KafkaAvroDeserializer.class, AvroCoder.of(Myclass.class))
);

p.run();

}
}

import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;

@DefaultCoder(AvroCoder.class)
public class Myclass{
String name;
String age;

Myclass(){}
Myclass(String n, String a) {
this.name= n;
this.age= a;
}
}

但是我现在得到以下错误

incompatible types: java.lang.Class < io.confluent.kafka.serializers.KafkaAvroDeserializer > cannot be converted to java.lang.Class < ? extends org.apache.kafka.common.serialization.Deserializer < java.lang.String > >

我一定是导入了不正确的序列化器?

最佳答案

我遇到过同样的问题。在此邮件存档中找到了解决方案。 http://mail-archives.apache.org/mod_mbox/beam-user/201710.mbox/%3CCAMsy_NiVrT_9_xfxOtK1inHxb=x_yAdBcBN+4aquu_hn0GJ0nA@mail.gmail.com%3E

在您的情况下,您需要定义自己的 Deserializer<MyClass> , 可以从 AbstractKafkaAvroDeserializer 延伸如下所示。

public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<MyClass> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public MyClass deserialize(String s, byte[] bytes) {
return (MyClass) this.deserialize(bytes);
}

@Override
public void close() {} }

然后将您的 KafkaAvroDeserializer 指定为 ValueDeserializer。

p.apply(KafkaIO.<Long, MyClass>read()
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(MyClassKafkaAvroDeserializer.class) );

关于java - 使用 Apache Beam 反序列化 Kafka AVRO 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54755668/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com