- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我目前正在研究使用 Java 处理特定 AVRO 模式演变场景时意外行为的解决方案,并在使用者中进行深度复制以将 GenericRecord 类解析为从 AVRO 模式生成的特定类。
为了解释正在发生的事情,我将使用一个简化的架构示例:
{
"name":"SimpleEvent",
"type":"record",
"namespace":"com.simple.schemas",
"fields":[
{
"name":"firstfield",
"type":"string",
"default":""
},
{
"name":"secondfield",
"type":"string",
"default":""
},
{
"name":"thirdfield",
"type":"string",
"default":""
}
]
}
这只是一个包含三个字符串字段的简单模式,所有字段都是可选的,因为它们具有默认值。假设在某个时候我想添加另一个字符串字段,并删除一个字段,因为它不再需要,你最终会得到这样的结果:
{
"name":"SimpleEvent",
"type":"record",
"namespace":"com.simple.schemas",
"fields":[
{
"name":"firstfield",
"type":"string",
"default":""
},
{
"name":"secondfield",
"type":"string",
"default":""
},
{
"name":"newfield",
"type":"string",
"default":""
}
]
}
根据模式演化规则,这不应破坏更改。然而,当生产者开始使用较新的模式生成事件时,下游消费者中会发生一些奇怪的事情。
原来生成的Java类(我使用Gradle avro插件生成类,但是maven插件和avro工具命令行代码生成产生相同的输出)只看字段顺序,而不看字段顺序不根据名称映射字段。
这意味着字段“newfield”的值被使用旧版本架构读取数据的下游消费者映射到“thirdfield”。
我发现了一些工作,其中 manual mapping是根据名称执行的,但是,这不适用于嵌套对象。
通过一些本地实验,我还发现了另一种可以正确解决架构差异的方法:
Schema readerSchema = SimpleEvent.getClassSchema();
Schema writerSchema = request.getSchema();
if (readerSchema.equals(writerSchema)){
return (SimpleEvent)SpecificData.get().deepCopy(writerSchema, request);
}
DatumWriter<GenericRecord> writer = new SpecificDatumWriter<>(writerSchema);
BinaryEncoder encoder = null;
ByteArrayOutputStream stream = new ByteArrayOutputStream();
encoder = EncoderFactory.get().binaryEncoder(stream, encoder);
writer.write(request, encoder);
encoder.flush();
byte[] recordBytes = stream.toByteArray();
Decoder decoder = DecoderFactory.get().binaryDecoder(recordBytes, null);
SpecificDatumReader<SimpleEvent> specificDatumReader = new SpecificDatumReader(writerSchema, readerSchema);
SimpleEvent result = specificDatumReader.read(null, decoder);
return result;
但是,这似乎是一种相当浪费/不优雅的方法,因为您首先必须将 GenericRecord 转换为 byteArray,然后使用 SpecificDatumReader 再次读取它。
deepcopy 和 datumreader 类之间的区别在于 datumReader 类似乎适应写入器架构与读取器架构不同的场景。
我觉得应该/可以有更好、更优雅的方式来处理这个问题。我真的很感激任何帮助/提示来实现这一目标。
提前致谢:)
奥斯卡
最佳答案
经过更多挖掘和查看我们之前在监听器中使用的 KafkaAvroDeserializer,我注意到 AbstractKafkaAvroDeserializer 有一个反序列化功能,您可以在其中传递阅读器模式。它看起来好得令人难以置信,但它确实有效!
package com.oskar.generic.consumer.demo;
import com.simple.schemas;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
public class SimpleEventDeserializer extends AbstractKafkaAvroDeserializer implements Deserializer<Object> {
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
configure(new KafkaAvroDeserializerConfig(configs));
}
@Override
public Object deserialize(String s, byte[] bytes) {
return super.deserialize(bytes, SimpleEvent.getClassSchema());
}
@Override
public void close() {
}
}
然后在消费者工厂中使用它,如下所示:
@Bean
public ConsumerFactory<String, GenericRecord> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:29095");
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "one");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SimpleEventDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
监听器代码本身如下所示:
@KafkaListener(topics = "my-topic")
public GenericRecord listen(@Payload GenericRecord request, @Headers MessageHeaders headers) throws IOException {
SimpleEvent event = (SimpleEvent) SpecificData.get().deepCopy(request.getSchema(), request);
return request;
}
关于java - Avro 架构 Java 深度复制问题与字段顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56206425/
我正在编写一个应用程序,允许用户创建一个“问卷”,然后向其中添加问题。我正在使用核心数据来存储信息。我创建了一个问卷实体,并与问题实体建立了“一对多”关系。我的问题是,如果要允许用户复制(复制)整个调
有没有办法复制或复制 SharedPreference?或者我需要从一个变量中获取每个变量,然后将它们放入另一个变量中吗? 最佳答案 尝试这样的事情: //sp1 is the shared pref
下面的(A)和(B)有区别吗? (假设 NON ARC,如果重要的话) // --- (A) --- @interface Zoo : NSObject{} @property (copy) Dog
我正在尝试将 mysql SELECT 查询保存到文件中,如下所示: $result = mysqli_query($db,$sql); $out = fopen('tmp/csv.csv', 'w'
我需要创建一个 CVPixelBufferRef 的副本,以便能够使用副本中的值以按位方式操作原始像素缓冲区。我似乎无法使用 CVPixelBufferCreate 或 CVPixelBufferCr
我在 Source 文件夹中有一个 Active wave 录音 wave-file.wav。我需要使用新名称 wave-file-copy.wav 将此文件复制到 Destination 文件夹。
在使用 GNU Autotools 构建的项目中,我有一个脚本需要通过 make 修改以包含安装路径。这是一个小例子: configure.ac: AC_INIT(foobar, 1.0) AC_PR
我想将 SQL 的行复制到同一个表中。但是在我的表中,我有一个“文本”列。 使用此 SQL: CREATE TEMPORARY TABLE produit2 ENGINE=MEMORY SELECT
谁能给我解释一下 df2 = df1 df2 = df1.copy() df3 = df1.copy(deep=False) 我已经尝试了所有选项并执行了以下操作: df1 = pd.DataFram
Hazelcast 是否具有类似于 Ehcache 的复制? http://www.ehcache.org/generated/2.9.0/pdf/Ehcache_Replication_Guide.
我有以下拓扑。一个 Ubuntu 16.04。运行我的全局 MySQL 服务器的 Amazon AWS 上的实例。我想将此服务器用作许多本地主服务器(Windows 机器 MySQL 服务器)的从服务
使用 SQLyog,我正在测试表中是否设置了正确的值。我尝试过 SELECT type_service FROM service WHERE email='test@gmail.com' 因此,只输出
有人可以提供一些关于如何配置 ElasticSearch 进行复制的说明。我在 Windows 中运行 ES,并且了解如果我在同一台服务器上多次运行 bat 文件,则会启动一个单独的 ES 实例,并且
一 点睛 ThreadGroup 复制线程的两个方法。 public int enumerate(Thread list[]) // 会将 ThreadGroup 中的 active 线程全部复制到
一 点睛 ThreadGroup 复制线程组的两个方法。 public int enumerate(ThreadGroup list[]) // 相对于 enumerate(list,true) pu
官方documentation Cassandra 说: Configure the keyspace and create the new datacenter: Use ALTER KEYSPAC
This question already has answers here: How to weight smoothing by arbitrary factor in ggplot2? (2个答
我们有一个表格来表明对各种俱乐部的兴趣。输出将数据记录在 Excel 电子表格中,其中列有他们的首选姓名、姓氏、电子邮件、代词,以及他们感兴趣的俱乐部的相应列中的“1”(下面的模型)。 我们希望为俱乐
This question already has answers here: Closed 8 years ago. Possible Duplicate: In vim, how do I get
如何复制形状及其所在的单元格?当我手动复制时,形状会跟随单元格,但是当我使用宏进行复制时,我会得到除形状之外的所有其他内容。 Cells(sourceRow, sourceColumn).Copy C
我是一名优秀的程序员,十分优秀!