gpt4 book ai didi

apache-kafka - 如何在kafka中只发送一次avro模式

转载 作者:行者123 更新时间:2023-12-05 06:41:46 25 4
gpt4 key购买 nike

我正在使用以下代码(不是真的,但让我们假设它)创建一个模式并由生产者将其发送到 kafka。

public static final String USER_SCHEMA = "{"
+ "\"type\":\"record\","
+ "\"name\":\"myrecord\","
+ "\"fields\":["
+ " { \"name\":\"str1\", \"type\":\"string\" },"
+ " { \"name\":\"str2\", \"type\":\"string\" },"
+ " { \"name\":\"int1\", \"type\":\"int\" }"
+ "]}";

public static void main(String[] args) throws InterruptedException {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);

byte[] bytes = recordInjection.apply(avroRecord);

ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);

Thread.sleep(250);

}

producer.close();
}

关键是代码允许我使用此模式仅发送 1 条消息。然后我需要更改架构名称以发送下一条消息...因此名称字符串现在是随机生成的,因此我可以发送更多消息。这是一个 hack,所以我想知道执行此操作的正确方法。

我还研究了如何在没有模式的情况下发送消息(即已经向 kafka 发送了 1 strip 有模式的消息,现在所有其他消息都不再需要模式了)——但是 new GenericData.Record(..) 需要一个架构参数。如果它为空,它将引发错误。

那么向kafka发送avro schema消息的正确方式是什么?

这是另一个代码示例 - 与我的完全相同:
https://github.com/confluentinc/examples/blob/kafka-0.10.0.1-cp-3.0.1/kafka-clients/producer/src/main/java/io/confluent/examples/producer/ProducerExample.java

它也没有显示如何在不设置模式的情况下发送。

最佳答案

我不明白这行:

The thing is the code allows me to send only 1 message with this schema. Then I need to change the schema name in order to send the next message.

在这两个示例中,您的示例和您提供的融合示例中,架构都没有发送到 Kafka。

在您提供的示例中,模式用于创建 GenericRecord 对象。您提供架构,因为您想要根据某些架构验证记录(例如,验证您只能将整数 int1 字段放入 GenericRecord 对象中)。

在您的代码中,唯一的区别是您决定将数据序列化为 byte[],这可能是不需要的,因为您可以将此责任委托(delegate)给 KafkaAvroSerializer,正如您在融合示例中看到的那样。

GenericRecord 是一个 Avro 对象,它不是 Kafka 的强制执行。如果你想将任何类型的对象发送到 Kafka(有或没有模式),你只需要创建(或使用现有的)序列化程序将你的对象转换为 byte[] 并在你为制作人。

通常,使用 Avro 消息本身发送指向架构的指针是一个很好的做法。您可以在以下链接中找到原因: http://www.confluent.io/blog/schema-registry-kafka-stream-processing-yes-virginia-you-really-need-one/

关于apache-kafka - 如何在kafka中只发送一次avro模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39603444/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com