gpt4 book ai didi

python - 从 python-kafka 转换为 confluent kafka - 如何使用 SASL_SSL、OAUTHBEARER 和 Tokens 创建奇偶校验

转载 作者:行者123 更新时间:2023-12-05 06:57:42 27 4
gpt4 key购买 nike

我有一个可以工作的 python kafka,它是代码:

class TokenProvider(object):

def __init__(self,client_id,client_secret):
self.client_id = client_id
self.client_secret = client_secret
def token(self):
token_url = 'https://test.com/protocol/openid-connect/token'
client = BackendApplicationClient(client_id=self.client_id)
oauth = OAuth2Session(client=client)
token_json = oauth.fetch_token(token_url=token_url, client_id=self.client_id, client_secret=self.client_secret)
token = token_json['access_token']
#print(token)
return token

consumer = KafkaConsumer(
group_id=None,
bootstrap_servers=['test.com:9094'],
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=TokenProvider(client_id,client_secret),
ssl_check_hostname=False,
ssl_context=create_ssl_context(),
auto_offset_reset=offset,
enable_auto_commit=False,
value_deserializer=lambda m: decode(m)
)
consumer.subscribe(topics=['test.stream'])

我的合流 python 是下面的,我得到这个错误

cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="Property "oauthbearer_token_refresh_cb" must be set through dedicated .._set_..() function"}

c = Consumer({
'bootstrap.servers': 'test.com:9094',
'sasl.mechanism': 'OAUTHBEARER',
'security.protocol': 'SASL_SSL',
'oauthbearer_token_refresh_cb': TokenProvider(client_id,client_secret),
'group.id': str(uuid.uuid1()),
'auto.offset.reset': 'earliest'
})

c.subscribe(['test.stream'])

那么如何让融合的 kafka 工作呢?我似乎对使用 OAUTHBEARER 和 SASL_SSL 的 oauthbearer_token_refresh_cb 有问题。

本质上,我使用 jwt token 进行身份验证

最佳答案

根据 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md 处的文档,必须使用 rd_kafka_conf_set_oauthbearer_token_refresh_cb() 设置 oauthbearer_token_refresh_cb 选项。但是请注意,您正在尝试将其设置为不可调用的 TokenProvider 实例,因此您可能希望传递 TokenProvider(...).token

SASL/OAUTHBEARER token refresh callback (set withrd_kafka_conf_set_oauthbearer_token_refresh_cb(), triggered byrd_kafka_poll(), et.al. This callback will be triggered when it istime to refresh the client's OAUTHBEARER token.

关于python - 从 python-kafka 转换为 confluent kafka - 如何使用 SASL_SSL、OAUTHBEARER 和 Tokens 创建奇偶校验,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64857884/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com