- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我真的很难让 Flink 与正在运行的 Kafka 实例正确通信,使用来自 Confluence 架构注册表的 Avro 架构(对于两者键和值)。
经过一段时间的思考和重组我的程序,我能够插入我的实现到目前为止:
生产者方法
public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {
final Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "--.-.-.--:9092");
properties.put("schema.registry.url", "http://--.-.-.---:8081");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class should not matter
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class but should not matter
return new FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>>("flink_output",
new GenericSerializer("flink_output", schemaK, schemaV, "http://--.-.-.---:8081"),
properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
}
<小时/>
GenericSerializer.java
package com.reeeliance.flink;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import flinkfix.ConfluentRegistryAvroSerializationSchema;
public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{
private String topic;
private Schema schemaKey;
private Schema schemaValue;
private String registryUrl;
public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
super();
this.topic = topic;
this.schemaKey = schemaK;
this.schemaValue = schemaV;
this.registryUrl = url;
}
public GenericSerializer() {
super();
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
byte[] key = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl).serialize(element.f0);
byte[] value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl).serialize(element.f1);
return new ProducerRecord<byte[], byte[]>(topic, key, value);
}
}
<小时/>
但是,当我执行作业时,它在准备阶段失败,而作业实际上没有运行,并出现以下错误:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [H_EQUNR type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
- custom writeObject data (class "java.util.ArrayList")
- root object (class "org.apache.avro.Schema$LockableArrayList", [H_EQUNR type:STRING pos:0])
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at java.util.ArrayList.writeObject(ArrayList.java:766)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
... 8 more
<小时/>
我知道所有类都必须实现可序列化接口(interface)或使其成为 transient ,但我不使用自己的类,并且错误不会解决不可序列化的函数(如通常的线程处理),而是记录或字段。该字段来自键模式,这是一种仅包含这一字段的模式。我认为我的错误在于使用 GenericRecord,它没有实现 Serialized 接口(interface),但我看到 GenericRecord 经常用于这种序列化,所以它对我来说没有任何意义。
ConfluenceRegistryAvroSerializationSchema 类取自 GitHub ,因为它尚未包含在我们使用的当前 Flink 版本(1.9.1)中。我包括了必要的类(class)并更改了类(class),我认为这可能不是我的问题的原因。 (Issue solved)
有人可以帮我调试这个吗?如果您能向我展示一种不同的方法来实现相同的目标,我也将不胜感激,到目前为止,Flink Avro 和 Confluence Schema Registry 的不兼容性一直让我发疯。
最佳答案
异常消息告诉您哪个类不可序列化。
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
问题出在您存储在 GenericSerializer
字段中的 Schema
类。
你可以试试这个:
public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{
private final SerializationSchema<GenericRecord> valueDeserializer;
private final SerializationSchema<GenericRecord> keyDeserializer;
public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
this.keyDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl);
this.valueDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl);
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
byte[] key = keySerializer.serialize(element.f0);
byte[] value = valueSerializer.serialize(element.f1);
return new ProducerRecord<byte[], byte[]>(topic, key, value);
}
}
ConfluenceRegistryAvroSerializationSchema
是可序列化的,因此您可以安全地将其存储在 GenericSerializer
中的字段中。
它的性能也会更高,因为不会为每个传入记录重新实例化底层结构。
关于java - 使用 GenericRecords 时,Flink Avro 序列化显示 "not serializable"错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59982631/
我有一个 GenericRecord,并且想要迭代整个键/值集合。记录是一个java数据结构,相当于一个普通的json字符串。例如: {"key1":"val1","key2":val2",.
我正在使用 Avro,我有一个 GenericRecord .我想提取 clientId , deviceName , holder从中。在 Avro 架构中,clientId是整数,deviceNa
给定一个 GenericRecord ,与对象相反,检索类型化值的推荐方法是什么?我们是否期望转换这些值,如果是,来自 Avro types 的映射是什么?到 Java 类型?例如,Avro Arra
我正在尝试发布 Avro(到 Kafka)并在尝试使用 BinaryEncoder 编写 Avro 对象时得到一个 NullPointerException。 这是简化的堆栈跟踪: java.lang
我正在尝试将 GenericRecord 转换为 json 字符串,以便我可以将其传递给 JSONObject 之类的东西。我正在考虑使用 JsonEncoder 来做到这一点。现在我有类似的东西:
我必须编写一些一次性的 Beam/Dataflow 管道,这些管道从 BigQuery 读取、提取两个字段,然后将它们写入其他地方。我计划只索引到 GenericRecord,而不是尝试根据 BigQ
我有以下架构: { "name": "AgentRecommendationList", "type": "record", "fields": [ {
我的 KafkaProducer 能够使用 KafkaAvroSerializer 将对象序列化到我的主题。但是,KafkaConsumer.poll() 返回反序列化的 GenericRecord
我想使用 Apache Avro 来序列化我的数据,我的客户端是用 C++ 编写的,而我的服务器是用 Java 编写的。 我的服务器 java 代码如下所示: Schema scm = new Sch
我有一段代码可以使用函数 avroToRowConverter() 将我的 avro 记录转换为 Row directKafkaStream.foreachRDD(rdd -> { J
如何将 Avro GenericRecord 转换为 Json,同时使用从毫秒到日期时间的时间戳字段? 目前使用 Avro 1.8.2 Timestamp tsp = new Timestam
我使用 Avro(序列化器和反序列化器)从 kafka 主题获取推文。然后我创建了一个 spark 消费者,它在 RDD [GenericRecord] 的 Dstream 中提取推文。现在我想将每个
我使用 Avro(序列化器和反序列化器)从 kafka 主题获取推文。然后我创建了一个 spark 消费者,它在 RDD [GenericRecord] 的 Dstream 中提取推文。现在我想将每个
我有一个通用记录,如下所示,其中 holder 是一个值为字符串的映射。 { "name" : "holder", "type" : { "type" : "map",
我有来自 avro 的自动生成的 Agr.java 模式文件。我在尝试转换为 GenericRecord 时看到此错误。 (FileStreamer.java:processFile(181)) -
我有一个带有嵌套字段的 GenericRecord。当我使用 genericRecord.get(1) 时,它返回一个包含嵌套 AVRO 数据的对象。 我希望能够像 genericRecord.get
在 apache beam 步骤中,我有一个 PCollection KV>>> 。我想将可迭代中的所有记录写入同一个 Parquet 文件中。我的代码片段如下 p.apply(ParDo.of(ne
Avro SpecificRecord(即生成的 java 类)是否与模式演变兼容? IE。如果我有 Avro 消息源(在我的例子中是 kafka)并且我想将这些消息反序列化为特定记录,是否可以安全地
我正在构建一个读取 Avro 通用记录的管道。要在阶段之间传递 GenericRecord,我需要注册 AvroCoder。文档说如果我使用通用记录,架构参数可以是任意的:https://beam.a
我正在尝试创建一个 Java 生产者,将 Avro 流式传输到 kafka 主题。我尝试重现 Confluent's official documentation 中提供的示例 但是无法找到 Gene
我是一名优秀的程序员,十分优秀!