gpt4 book ai didi

scala - Spark 结构化流 未授权访问组

转载 作者:行者123 更新时间:2023-12-02 03:04:50 26 4
gpt4 key购买 nike

我正在尝试通过 Spark 结构化流从 Kafka 读取数据。但是,在 Spark 2.4.0 中,您无法为流设置组 ID(请参阅 How to set group.id for consumer group in kafka data source in Structured Streaming? )。

但是,由于未设置此值,spark 只是生成组 Id,而我陷入了 GroupAuthorizationException:

19/12/10 15:15:00 ERROR streaming.MicroBatchExecution: Query [id = 747090ff-120f-4a6d-b20e-634eb77ac7b8, runId = 63aa4cce-ad72-47f2-80f6-e87947b69685] terminated with error
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: spark-kafka-source-d2420426-13d5-4bda-ad21-7d8e43ebf518-1874352823-driver-2

有什么想法可以绕过这个吗?有趣的是,我可以通过 kafka-console-consumer.sh 读取这些数据,我可以在 .properties 文件中传递组 ID。

抛出异常的代码:

val df = spark
.readStream
.format("kafka")
.option("subscribe", "topic")
.option("startingOffsets", "earliest")
.option("kafka.group.id", "idThatShouldBeUsed")
.option("kafka.bootstrap.servers", "server")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/location)
.option("kafka.ssl.truststore.password", "pass")
.option("kafka.sasl.jaas.config", """jaasToUse""")
.load()
.writeStream
.outputMode("append")
.format("console")
.option("startingOffsets", "earliest")
.start().awaitTermination()

最佳答案

这似乎从消费者的角度来说是无法解决的。我们最终不得不使用 bin/kafka-acls.sh 和通配符来允许结构化流生成的所有组 ID。

kafka acl 示例:

bin/kafka-acls --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zk:2181 --add --allow-principal User:'Bon' --operation READ --topic topicName --group='spark-kafka-source-' --resource-pattern-type prefixed

关于scala - Spark 结构化流 未授权访问组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59270531/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com