- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我只看到一个线程包含有关我提到的主题的信息,即: How to Deserialising Kafka AVRO messages using Apache Beam
但是,在尝试了 kafkaserializers 的一些变体之后,我仍然无法反序列化 kafka 消息。这是我的代码:
public class Readkafka {
private static final Logger LOG = LoggerFactory.getLogger(Readkafka.class);
public static void main(String[] args) throws IOException {
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(
PipelineOptionsFactory.fromArgs(args).withValidation().create());
PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
KafkaIO.<action_states_pkey, String>read()
.withBootstrapServers("mybootstrapserver")
.withTopic("action_States")
.withKeyDeserializer(MyClassKafkaAvroDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistryurl"))
.withMaxNumRecords(5)
.withoutMetadata();
p.apply(kafka)
.apply(Keys.<action_states_pkey>create())
}
MyClassKafkaAvroDeserilizer 在哪里
public class MyClassKafkaAvroDeserializer extends
AbstractKafkaAvroDeserializer implements Deserializer<action_states_pkey> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public action_states_pkey deserialize(String s, byte[] bytes) {
return (action_states_pkey) this.deserialize(bytes);
}
@Override
public void close() {} }
类action_states_pkey是使用avro工具生成的代码
java -jar pathtoavrotools/avro-tools-1.8.1.jar compile schema pathtoschema/action_states_pkey.avsc destination path
其中action_states_pkey.avsc字面上是
{"type":"record","name":"action_states_pkey","namespace":"namespace","fields":[{"name":"ad_id","type":["null","int"]},{"name":"action_id","type":["null","int"]},{"name":"state_id","type":["null","int"]}]}
使用此代码我收到错误:
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:20)
at my.mudah.beam.test.MyClassKafkaAvroDeserializer.deserialize(MyClassKafkaAvroDeserializer.java:1)
at org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:221)
at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.advanceWithBackoff(BoundedReadFromUnboundedSource.java:279)
at org.apache.beam.sdk.io.BoundedReadFromUnboundedSource$UnboundedToBoundedSourceAdapter$Reader.start(BoundedReadFromUnboundedSource.java:256)
at com.google.cloud.dataflow.worker.WorkerCustomSources$BoundedReaderIterator.start(WorkerCustomSources.java:592)
... 14 more
尝试将 Avro 数据映射到我的自定义类时似乎出现错误?
或者,我尝试了以下代码:
PTransform<PBegin, PCollection<KV<action_states_pkey, String>>> kafka =
KafkaIO.<action_states_pkey, String>read()
.withBootstrapServers("bootstrapserver")
.withTopic("action_states")
.withKeyDeserializerAndCoder((Class)KafkaAvroDeserializer.class, AvroCoder.of(action_states_pkey.class))
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("schema.registry.url", (Object)"schemaregistry"))
.withMaxNumRecords(5)
.withoutMetadata();
p.apply(kafka);
.apply(Keys.<action_states_pkey>create())
// .apply("ExtractWords", ParDo.of(new DoFn<action_states_pkey, String>() {
// @ProcessElement
// public void processElement(ProcessContext c) {
// action_states_pkey key = c.element();
// c.output(key.getAdId().toString());
// }
// }));
在我尝试打印数据之前,它不会给我任何错误。我必须验证我是否以某种方式成功读取数据,因此我的目的是将数据记录在控制台中。如果我取消注释部分的注释,我会再次收到相同的错误:
SEVERE: 2019-09-13T07:53:56.168Z: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to my.mudah.beam.test.action_states_pkey
at my.mudah.beam.test.Readkafka$1.processElement(Readkafka.java:151)
另一件事要注意的是,如果我指定:
.updateConsumerProperties(ImmutableMap.of("specific.avro.reader", (Object)"true"))
总是给我一个错误
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id 443
Caused by: org.apache.kafka.common.errors.SerializationException: Could not find class NAMESPACE.action_states_pkey specified in writer's schema whilst finding reader's schema for a SpecificRecord.
看来我的做法有问题?如果有人有使用 Apache Beam 从 Kafka Streams 读取 AVRO 数据的经验,请帮助我。我非常感激。
这是我的包的快照,其中还包含架构和类: package/working path details
谢谢。
最佳答案
public class MyClassKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
您的类正在扩展 AbstractKafkaAvroDeserializer
,它返回 GenericRecord
。
您需要convert the GenericRecord
to your custom object .
或者
使用 SpecificRecord
来实现此目的,如以下答案之一所述:
/**
* Extends deserializer to support ReflectData.
*
* @param <V>
* value type
*/
public abstract class ReflectKafkaAvroDeserializer<V> extends KafkaAvroDeserializer {
private Schema readerSchema;
private DecoderFactory decoderFactory = DecoderFactory.get();
protected ReflectKafkaAvroDeserializer(Class<V> type) {
readerSchema = ReflectData.get().getSchema(type);
}
@Override
protected Object deserialize(
boolean includeSchemaAndVersion,
String topic,
Boolean isKey,
byte[] payload,
Schema readerSchemaIgnored) throws SerializationException {
if (payload == null) {
return null;
}
int schemaId = -1;
try {
ByteBuffer buffer = ByteBuffer.wrap(payload);
if (buffer.get() != MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!");
}
schemaId = buffer.getInt();
Schema writerSchema = schemaRegistry.getByID(schemaId);
int start = buffer.position() + buffer.arrayOffset();
int length = buffer.limit() - 1 - idSize;
DatumReader<Object> reader = new ReflectDatumReader(writerSchema, readerSchema);
BinaryDecoder decoder = decoderFactory.binaryDecoder(buffer.array(), start, length, null);
return reader.read(null, decoder);
} catch (IOException e) {
throw new SerializationException("Error deserializing Avro message for id " + schemaId, e);
} catch (RestClientException e) {
throw new SerializationException("Error retrieving Avro schema for id " + schemaId, e);
}
}
}
关于java - 如何使用 Apache Beam (KafkaIO) 反序列化 avro 数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57919740/
我在 Apchea Beam 中编写了一个非常简单的管道,如下所示,用于从 Confluence Cloud 上的 kafka 集群读取数据,如下所示: Pipeline pipeli
我一直在浏览 Beam KafkaIO 教程,并一直在尝试查找有关 kafka 客户端身份验证的文档,但到目前为止只找到了非常基本的示例。我需要为 Kafkaio 客户端提供以下配置才能成功进行身份验
我正在 Google Dataflow 中使用 Beam KafkaIO 源运行作业,但找不到一种简单的方法来在作业重新启动时保持偏移量(作业更新选项不够,我需要重新启 Action 业) 将 Bea
我目前正在使用 Google Cloud Dataflow 和 Apache Beam 来使用来自两个不同 Kafka 集群中存在的 Kafka 主题的消息,这两个集群包含相同的主题名称,但主题中的数
Google Dataflow 作业使用 Apache Beam 的 KafkaIO 库以及 AvroIO 和 Windowed Writes 将输出写入 Google Cloud Storage 存
我正在使用 apache beam DirectRunner 从 kafka 主题加载数据。我的代码如下: conf={'bootstrap.servers':'localhost:9092'} wi
我只看到一个线程包含有关我提到的主题的信息,即: How to Deserialising Kafka AVRO messages using Apache Beam 但是,在尝试了 kafkaser
从服务器上,我能够连接并从配置了 SSL 的远程 kafka 服务器主题中获取数据。 从 GCP,我如何使用传递 SSL 信任库、 keystore 证书位置和 Google 服务帐户 json 的
我正在构建一个 Apache Beam 管道,以从 Kafka 读取作为无界源。 我能够使用直接运行程序在本地运行它。 但是,当在云端使用 Google Cloud Dataflow 运行器运行时,管
我是一名优秀的程序员,十分优秀!