gpt4 book ai didi

java - KafkaProducer 总是选择 localhost :8081 for schema registry in Java API

转载 作者:行者123 更新时间:2023-12-02 01:50:16 26 4
gpt4 key购买 nike

KafkaProducer 无法选择其属性中定义的 schema.registry.url

正如我们在下面的屏幕截图中看到的,架构注册表 url 是一个虚拟 url

// variable which is being debugged
private KafkaProducer<K, V> kafkaFullAckProducer;

enter image description here

但在我的日志中,使用 KafkaProducer 发布消息仍然失败,主机为 http://0:8081

{"@timestamp":"2018-10-31T18:57:37.906+05:30","message":"Failed to send HTTP request to endpoint: http://0:8081/subjects/

上述两个证明是在程序的一次运行中获得的。我们可以清楚地看到eclipse调试时提示的schmeregistry url是123.1.1.1,但是却是http://0如果我的日志失败。

因此,在我的其他环境中,我无法运行其他指定的 schema.registry.url,因为它始终使用 http://0

代码托管在一台计算机上,架构注册表/代理位于另一台计算机上。

开发环境启动registry./confluence start

我的制作人代码:

private KafkaProducer<K, V> kafkaFullAckProducer;
MonitoringConfig config;

public void produceWithFullAck(String brokerTopic, V genericRecord) throws Exception {
// key is null
ProducerRecord<K, V> record = new ProducerRecord<K, V>(brokerTopic, genericRecord);
try {
Future<RecordMetadata> futureHandle = this.kafkaFullAckProducer.send(record, (metadata, exception) -> {

if (metadata != null) {
log.info("Monitoring - Sent record(key=" + record.key() + " value=" + record.value()
+ " meta(partition=" + metadata.partition() + " offset=" + metadata.offset() + ")");
}
});
RecordMetadata recordMetadata = futureHandle.get();

} catch (Exception e) {
if (e.getCause() != null)
log.error("Monitoring - " + e.getCause().toString());
throw new RuntimeException(e.getMessage(), e.getCause());
} finally {
// initializer.getKafkaProducer().close();
}
}

@PostConstruct
private void initialize() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBrokerList());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
// kafkaProps.put("value.serializer",
// "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomAvroSerializer.class.getName());
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, Constants.COMPRESSION_TYPE_CONFIG);
props.put(ProducerConfig.RETRIES_CONFIG, config.getProducerRetryCount());
props.put("schema.registry.url", config.getSchemaRegistryUrl()); // the url is coming right here
props.put("acks", "all");
kafkaFullAckProducer = new KafkaProducer<K, V>(props);
}

监控配置: 公共(public)类监控配置{

    @Value("${kafka.zookeeper.connect}")
private String zookeeperConnect;

@Value("${kafka.broker.list}")
private String brokerList;

@Value("${consumer.timeout.ms:1000}")
private String consumerTimeout;

@Value("${producer.retry.count:1}")
private String producerRetryCount;

@Value("${schema.registry.url}")
private String schemaRegistryUrl;

@Value("${consumer.enable:false}")
private boolean isConsumerEnabled;

@Value("${consumer.count.thread}")
private int totalConsumerThread;
}

application.properties:

kafka.zookeeper.connect=http://localhost:2181
kafka.broker.list=http://localhost:9092
consumer.timeout.ms=1000
producer.retry.count=1
schema.registry.url=http://123.1.1.1:8082

自定义 avro 序列化器是我需要弃用的东西,并使用所讨论的方式 here但我确信这不是这个问题的原因。

以下是主机的详细信息:HOST 1:有这个 Java 服务和 Kafka Connect,错误日志来自这里。HOST 2:有 Kafka、Schema 注册表和 Zookeper。

最佳答案

您正在使用自定义序列化器,并且作为 Serializer 实现的一部分,您必须定义一个接受 Map 的 configure 方法。

在该方法中,我猜您定义了 CachedSchemaRegistryClient字段,但没有从配置映射中提取在生产者级别添加的 url 属性,因此它将默认使用其他本地主机地址

Confluence 代码需要单步执行四个类,但您将看到序列化器实现 here ,然后看单独的Config类,以及Abstract Serializer和SerDe父类。来自 your previous post ,我曾指出,我认为您不需要实际使用自定义 Avro 序列化器,因为您似乎正在重做 AbstractKafkaSerializer 类的操作

关于java - KafkaProducer 总是选择 localhost :8081 for schema registry in Java API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53084762/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com