gpt4 book ai didi

apache-flink - 在 Flink 1.9.1 下使用 Confluent Registry 序列化 Kafka 消息

转载 作者:行者123 更新时间:2023-12-04 10:03:32 25 4
gpt4 key购买 nike

是否可以将消息发布到用 KafkaAvroSerializer 序列化的 Kafka通过汇合。我正在使用 Flink 1.9.1 看到一些开发正在进行中 flink-avro (1.11.0) 的新版本,但我坚持这个版本。

我想用新推出的KafkaSerializationSchema用于将消息序列化到 Confluent schema-registry 和 Kakfa。

在这里,我目前有一个类正在转换类类型 T到 avro 但我想使用融合的序列化。

public class KafkaMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {
public static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSerialization.class);

final private String topic;

public KafkaMessageSerialization(String topic) {
this.topic = topic;
}

@Override
public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final Schema schema = event.getSchema();
final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);

try {
writer.write(event, binEncoder);
binEncoder.flush();
} catch (final Exception e) {
LOG.error("serialization error", e);
throw new RuntimeException(e);
}

return new ProducerRecord<>(topic, outputStream.toByteArray());
}
}

使用相当方便 .addSink(new FlinkKafkaProducer<>(SINK_TOPIC, new KafkaMessageSerialization<>(SINK_TOPIC), producerProps, Semantic.AT_LEAST_ONCE))

最佳答案

我处于同样的情况,根据你的解决方案,我写了这门课。我已经用 Flink 1.10.1 测试过了。

public class ConfluentAvroMessageSerialization<T extends SpecificRecordBase> implements KafkaSerializationSchema<T> {

public static final org.slf4j.Logger LOG = LoggerFactory.getLogger(ConfluentAvroMessageSerialization.class);

final private String topic;
final private int schemaId;
final private int magicByte;

public ConfluentAvroMessageSerialization(String topic, String schemaRegistryUrl) throws IOException, RestClientException {
magicByte = 0;
this.topic = topic;

SchemaRegistryClient schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
SchemaMetadata schemaMetadata = schemaRegistry.getLatestSchemaMetadata(topic + "-value");
schemaId = schemaMetadata.getId();

LOG.info("Confluent Schema ID {} for topic {} found", schemaId, topic);
}

@Override
public ProducerRecord<byte[], byte[]> serialize(T event, Long timestamp) {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final Schema schema = event.getSchema();
final DatumWriter<T> writer = new ReflectDatumWriter<>(schema);
final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(outputStream, null);

try {
byte[] schemaIdBytes = ByteBuffer.allocate(4).putInt(schemaId).array();
outputStream.write(magicByte); // Confluent Magic Byte
outputStream.write(schemaIdBytes); // Confluent Schema ID (4 Byte Format)
writer.write(event, binEncoder); // Avro data
binEncoder.flush();
} catch (final Exception e) {
LOG.error("Schema Registry Serialization Error", e);
throw new RuntimeException(e);
}

return new ProducerRecord<>(topic, outputStream.toByteArray());
}
}

Confluent 有一个带有魔法字节和模式 ID(4 字节)的属性格式。更多信息请查看 https://docs.confluent.io/current/schema-registry/serdes-develop/index.html#wire-format

关于apache-flink - 在 Flink 1.9.1 下使用 Confluent Registry 序列化 Kafka 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61708933/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com