gpt4 book ai didi

java - 如何使用camel-avro-消费者和生产者?

转载 作者:行者123 更新时间:2023-11-30 01:53:42 26 4
gpt4 key购买 nike

我没有看到如何使用camel-avro组件来生成和使用kafka avro消息的示例?目前我的 Camel 路线是这样的。为了使用camel-kafka-avro消费者和生产者来使用schema-registry和其他像这样的 Prop ,应该改变什么。

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

public void configure() {
PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
pc.setLocation("classpath:application.properties");

log.info("About to start route: Kafka Server -> Log ");

from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}"
+ "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}"
+ "&groupId={{consumer.group}}"
+"&valueDeserializer="+KafkaAvroDeserializer.class
+"&keyDeserializer="+StringDeserializer.class
)
.routeId("FromKafka")
.log("${body}");

最佳答案

我正在回答我自己的问题,因为我在这个问题上呆了几天。我希望这个答案对其他人有帮助。

我尝试使用 io.confluence.kafka.serializers.KafkaAvroDeserializer 反序列化器并得到 kafka 异常。所以我必须编写自己的反序列化器来执行以下操作:

  1. 设置架构注册表
  2. 使用特定的 avro 阅读器(这意味着不是默认的 stringDeserializer)

然后我们必须访问“schemaRegistry”,“useSpecificAvroReader”并设置AbstractKafkaAvroDeserializer的这些字段(io.confluence.kafka.serializers.AbstractKafkaAvroDeserializer)

这是解决方案...

Camel -卡夫卡-AVRO-路线构建器

public static void main(String[] args) throws Exception {
LOG.info("About to run Kafka-camel integration...");
CamelContext camelContext = new DefaultCamelContext();
// Add route to send messages to Kafka
camelContext.addRoutes(new RouteBuilder() {
public void configure() throws Exception {
PropertiesComponent pc = getContext().getComponent("properties",
PropertiesComponent.class);
pc.setLocation("classpath:application.properties");

log.info("About to start route: Kafka Server -> Log ");

from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
+ "&maxPollRecords={{consumer.maxPollRecords}}"
+ "&consumersCount={{consumer.consumersCount}}"
+ "&seekTo={{consumer.seekTo}}"
+ "&groupId={{consumer.group}}"
+ "&keyDeserializer="+ StringDeserializer.class.getName()
+ "&valueDeserializer="+CustomKafkaAvroDeserializer.class.getName()
)
.routeId("FromKafka")
.log("${body}");

}
});
camelContext.start();
// let it run for 5 minutes before shutting down
Thread.sleep(5 * 60 * 1000);
camelContext.stop();
}

DESERIALIZER CLASSS - 这在抽象 AbstractKafkaAvroDeserializer 级别设置 schema.registry.url 和 use.specific.avro.reader。如果我不设置这个,我会得到 kafka-config-exception。

package com.example.camel.kafka.avro;

import java.util.Collections;
import java.util.List;
import java.util.Map;


import io.confluent.common.config.ConfigException;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import org.apache.kafka.common.serialization.Deserializer;



public class CustomKafkaAvroDeserializer extends AbstractKafkaAvroDeserializer
implements Deserializer<Object> {

private static final String SCHEMA_REGISTRY_URL = "http://localhost:8081";

@Override
public void configure(KafkaAvroDeserializerConfig config) {

try {
final List<String> schemas =
Collections.singletonList(SCHEMA_REGISTRY_URL);
this.schemaRegistry = new CachedSchemaRegistryClient(schemas,
Integer.MAX_VALUE);
this.useSpecificAvroReader = true;

} catch (ConfigException e) {
throw new org.apache.kafka.common.config.ConfigException(e.getMessage());
}
}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
configure(null);
}

@Override
public Object deserialize(String s, byte[] bytes) {
return deserialize(bytes);
}

@Override
public void close() {
}
}

关于java - 如何使用camel-avro-消费者和生产者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55190052/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com