gpt4 book ai didi

python - Spark 流作业性能改进

转载 作者:可可西里 更新时间:2023-11-01 10:55:34 25 4
gpt4 key购买 nike

有一个 spark streaming 作业一直在运行,计算流中的单词,并且只应计算并返回给定词汇表中的单词。

但是,这个词汇表不是固定的,而是存储在 redis 中,并且可以随时间变化。这是这项工作的简单实现:

sc = SparkContext(appName="WordCount")
ssc = StreamingContext(sc, 10) # batch interval is 10s

def check_if_in_vocab(word):
vocab = redis_client.smembers() # get all vocabulary from redis
return word in vocab

lines = ssc.socketTextStream(host_ip, port) # read data stream from the socket
words = lines.flatMap(lambda line: line.split(" "))\
.filter(check_if_in_vocab)\ # ANY BETTER SOLUTION HERE???
.map(lambda word: (word, 1)) # create (word, count) pair
counts = words.reduceByKey(lambda x,y: x+y)
counts.pprint()

我认为我的实现性能很差,因为 filter(check_if_in_vocabulary) 转换从流中的每个元素的 redis 中提取词汇表,这太耗时了。

有更好的解决方案吗?

跟进

好的,在上面的问题中,由于词汇表随时可能发生变化,所以我需要经常检查redis,现在假设词汇表每60秒或1小时才变化一次,是否更简单改进上面的代码?

最佳答案

我从未在 Python 中使用过 Spark,但如果这是 Scala 实现,我会通过在 mapPartitions 调用中调用 Redis 来寻求改进。在 mapPartitions 中,我将与客户端建立连接,获取 vocab,使用内存中的 vocab 来过滤可迭代对象,然后关闭连接。

也许您可以使用 Python API 做一些类似的事情。

关于python - Spark 流作业性能改进,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43189122/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com