gpt4 book ai didi

apache-kafka - 卡夫卡流异常 : GroupAuthorizationException

转载 作者:行者123 更新时间:2023-12-01 06:14:42 24 4
gpt4 key购买 nike

我正在开发一个 Kafka-Stream 应用程序,它将从输入 Kafka 主题中读取消息并过滤不需要的数据并推送到输出 Kafka 主题。

卡夫卡流配置:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {

Map<String, Object> streamsConfiguration = new HashMap<>();
streamsConfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "abcd");
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "QC-NormalizedEventProcessor-v1.0.0");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9072");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.ACKS_CONFIG), -1);
streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkaConsumerProperties.getConsumerJKSFileLocation());
streamsConfiguration.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, kafkaConsumerProperties.getConsumerJKSPwd());
streamsConfiguration.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
streamsConfiguration.put(SASL_MECHANISM, "PLAIN");

return new KafkaStreamsConfiguration(streamsConfiguration);
}

KStream过滤逻辑:
@Bean
public KStream<String, String> kStreamJson(StreamsBuilder builder) {

KStream<String, String> stream = builder.stream(kafkaConsumerProperties.getConsumerTopic(), Consumed.with(Serdes.String(), Serdes.String()));
/** Printing the source message */
stream.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " *****Message From Input Topic: " + key + ": " + value));
KStream<String, String> filteredDocument = stream.filter((k, v) -> filterCondition.test(k, v));

filteredDocument.to(kafkaConsumerProperties.getProducerTopic(), Produced.with(Serdes.String(), Serdes.String()));
/** After filtering printing the same message */
filteredDocument.foreach((key, value) -> LOGGER.info(THREAD_NO + Thread.currentThread().getId() + " #####Filtered Document: " + key + ": " + value));
return stream;
}

从基于 spring 的 Kafka Stream 应用程序开始时,我遇到了异常。
2019-05-27T07:58:36.018-0500 ERROR stream-thread [QC-NormalizedEventProcessor-v1.0.0-e9cb1bed-3d90-41f1-957a-4fc7efc12a02-StreamThread-1] Encountered the following unexpected Kafka exception during processing, this usually indicate Streams internal errors:
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: QC-NormalizedEventProcessor-v1.0.0

我们的 Kafka Infra 团队为“group.id”授予了必要的权限,使用相同的“组 ID”,我可以使用其他 Kafka Consumer 应用程序和 使用该消息。我在“application.id”中按照我的意愿使用名称 .我们不会在 Kafka 访问控制列表中添加/更新“application.id”。

我真的不确定我们是否需要为“application.id”授予任何权限,或者在 Kafka Stream 配置中缺少某些内容。请指教。

请注意:我曾尝试在 Kafka Stream 配置中使用“group.id”而不使用“group.id”,但我一直遇到相同的异常。

谢谢!
巴拉提拉杰·尚穆甘

最佳答案

我不在办公 table 前,但我认为 Streams 将 group.id 设置为 application.id。

关于apache-kafka - 卡夫卡流异常 : GroupAuthorizationException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56330114/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com