gpt4 book ai didi

java - 如何使用 Apache Beam (KafkaIO) 反序列化 avro 数据

转载 作者:行者123 更新时间:2023-12-02 00:43:12 30 4
gpt4 key购买 nike

我只看到一个线程包含有关我提到的主题的信息,即: How to Deserialising Kafka AVRO messages using Apache Beam

但是,在尝试了 kafkaserializers 的一些变体之后,我仍然无法反序列化 kafka 消息。这是我的代码:

public class Readkafka {
private static final Logger LOG = LoggerFactory.getLogger(Readkafka.class);

public static void main(String[] args) throws IOException {
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
KafkaIO.<action_states_pkey, String>read()
.withBootstrapServers("mybootstrapserver")
.withTopic("action_States")
.withKeyDeserializer(MyClassKafkaAvroDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistryurl"))
.withMaxNumRecords(5)
.withoutMetadata();


p.apply(kafka)
.apply(Keys.<action_states_pkey>create())
}

MyClassKafkaAvroDeserilizer 在哪里

public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<action_states_pkey> {

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}

@Override
public action_states_pkey deserialize(String s, byte[] bytes) {
return (action_states_pkey) this.deserialize(bytes);
}

@Override
public void close() {} }

类action_states_pkey是使用avro工具生成的代码

java -jar pathtoavrotools/avro-tools-1.8.1.jar compile schema pathtoschema/action_states_pkey.avsc destination path

其中action_states_pkey.avsc字面上是

{"type":"record","name":"action_states_pkey","namespace":"namespace","fields":[{"name":"ad_id","type":["null","int"]},{"name":"action_id","type":["null","int"]},{"name":"state_id","type":["null","int"]}]}

使用此代码我收到错误:

Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:20)
at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:1)
at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:221)
at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.advanceWithBackoff(BoundedReadFromUnboundedSource.java:279)
at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.start(BoundedReadFromUnboundedSource.java:256)
at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:592)
... 14 more

尝试将 Avro 数据映射到我的自定义类时似乎出现错误?

或者,我尝试了以下代码:

        PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
KafkaIO.<action_states_pkey, String>read()
.withBootstrapServers("bootstrapserver")
.withTopic("action_states")
.withKeyDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(action_states_pkey.class))
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistry"))
.withMaxNumRecords(5)
.withoutMetadata();


p.apply(kafka);
.apply(Keys.<action_states_pkey>create())
// .apply("ExtractWords", ParDo.of(new DoFn<action_states_pkey, String>() {
// @ProcessElement
// public void processElement(ProcessContext c) {
// action_states_pkey key = c.element();
// c.output(key.getAdId().toString());
// }
// }));

在我尝试打印数据之前,它不会给我任何错误。我必须验证我是否以某种方式成功读取数据,因此我的目的是将数据记录在控制台中。如果我取消注释部分的注释,我会再次收到相同的错误:

SEVERE: 2019-09-13T07:53:56.168Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
at my.mudah.beam.test.Readkafka$1.processElement(Readkafka.java:151)

另一件事要注意的是,如果我指定:

.updateConsumerProperties(ImmutableMap.of("specific.avro.reader", (Object)"true"))

总是给我一个错误

Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 443
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class NAMESPACE.action_states_pkey specified in writer's schema whilst finding reader's schema for a SpecificRecord.

看来我的做法有问题?如果有人有使用 Apache Beam 从 Kafka Streams 读取 AVRO 数据的经验,请帮助我。我非常感激。

这是我的包的快照,其中还包含架构和类: package/working path details

谢谢。

最佳答案

public class MyClassKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer

您的类正在扩展 AbstractKafkaAvroDeserializer,它返回 GenericRecord

您需要convert the GenericRecord to your custom object .

或者

使用 SpecificRecord 来实现此目的,如以下答案之一所述:

/**
* Extends deserializer to support ReflectData.
*
* @param <V>
* value type
*/
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {

private Schema readerSchema;
private DecoderFactory decoderFactory = DecoderFactory.get();

protected ReflectKafkaAvroDeserializer(Class<V> type) {
readerSchema = ReflectData.get().getSchema(type);
}

@Override
protected Object deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchemaIgnored) throws SerializationException {

if (payload == null) {
return null;
}

int schemaId = -1;
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}

schemaId = buffer.getInt();
Schema writerSchema = schemaRegistry.getByID(schemaId);

int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - 1 - idSize;
DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
return reader.read(null, decoder);
} catch (IOException e) {
throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
} catch (RestClientException e) {
throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
}
}
}

以上复制自https://stackoverflow.com/a/39617120/2534090

https://stackoverflow.com/a/42514352/2534090

关于java - 如何使用 Apache Beam (KafkaIO) 反序列化 avro 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57919740/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com