gpt4 book ai didi

python - 在python中的kafka Direct Stream中手动提交偏移量

转载 作者:行者123 更新时间:2023-12-01 08:28:25 25 4
gpt4 key购买 nike

我正在将一个用 scala 编写的流应用程序移植到 python。我想手动提交 DStream 的偏移量。这是在 scala 中完成的,如下所示:

stream = KafkaUtils.createDirectStream(soomeConfigs)
stream.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

// some time later, after outputs have completed
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

但是我在python中找不到类似的API。您能否指导我如何使用 python 客户端手动提交偏移量。

最佳答案

我通过返回 pyspark 2.2 库解决了这个问题,因为它有 API 来获取 offsetRanges 并将偏移量存储在 redis 上。我不得不回到 python 2.7,因为 python 3.6 中没有“长”支持。

import redis
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, KafkaRDD


def get_offset_ranges(topic):
ranges = None

rk = '{topic}:offsets'.format(topic=topic)
cache = redis.Redis()
if cache.exists(rk):
mapping = cache.hgetall(rk)
ranges = dict()
for k, v in mapping.items():
tp = TopicAndPartition(topic, int(k))
ranges[tp] = long(v)

return ranges


def update_offset_ranges(offset_ranges):
cache = redis.Redis()
for rng in offset_ranges:
rk = '{rng.topic}:offsets'.format(rng=rng)
print("updating redis_key: {}, partion:{} , lastOffset: {} ".format(rk, rng.partition, rng.untilOffset))
cache.hset(rk, rng.partition, rng.untilOffset)


def do_some_work(rdd):
pass


def process_dstream(rdd):
rdd.foreachPartition(lambda iter: do_some_work(iter))

krdd = KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
off_ranges = krdd.offsetRanges()
for o in off_ranges:
print(str(o))
update_offset_ranges(off_ranges)


sc = SparkContext(appName="mytstApp")
ssc = StreamingContext(sc, 1)

kafka_params = {
"bootstrap.servers": "localhost:9092",
"group.id": "myUserGroup",
"enable.auto.commit": "false",
"auto.offset.reset": "smallest"
}

topic = "mytopic"
offset_ranges = get_offset_ranges(topic)
dstream = KafkaUtils.createDirectStream(ssc, "mytopic", kafka_params, fromOffsets=offset_ranges)
dstream.foreachRDD(process_dstream)
# Start our streaming context and wait for it to 'finish'
ssc.start()

# Wait for the job to finish
try:
ssc.awaitTermination()
except Exception as e:
ssc.stop()
raise e # to exit with error condition

关于python - 在python中的kafka Direct Stream中手动提交偏移量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54068555/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com