gpt4 book ai didi

python - 使用来自多个 RDD 的相同键提取和保存值的最佳方法

转载 作者:太空宇宙 更新时间:2023-11-03 14:04:21 32 4
gpt4 key购买 nike

我使用从 HBase 中提取的数据在 PySpark 中创建了两个 RDD。我想收集具有相同行键的项目,存储这些项目,然后搜索与每个项目关联的值。理想情况下,我会将结果存储在 pyspark.sql 对象中,因为我想将 Levenshtein 距离应用于它们的内容。

详细信息:

在 HBase 中,我有位置数据,其中行键是给定区域的 geohash,在列中,该区域中有多个地点,其中包含更多关于位置的详细信息(带有描述和其他文本数据的 json)。我有两个 HBase 表,它们的位置可以相同。我想搜索这两个 RDD 中的数据,检查相似的 geohashes 并将结果存储在新的数据结构中。

我不想重新发明轮子,而且我刚刚开始学习 Spark,因此我想知道:完成此类任务的最佳方法是什么?内置函数 rdd.intersection 是一个好的解决方案吗?

最佳答案

已编辑:实际上感谢@Aneel 的评论,我可以纠正我的一些错误。实际上有一个对 RDD 的 join 调用给出了相同的(连接是在 RDD 的第一列上完成的,值是两个 RDD 其余列的元组),作为一个调用 使用 Spark SQL 的 JOIN 给出了结果,而不是像之前指出的那样执行 cogroup ,因为正如@Aneel 指出的那样 cogroup 在一个单一的下压缩键值对键。

现在换个说法,我尝试了@Aneel 的方法和上面的要点,并尝试对其进行一些基准测试,这是结果,使用 Databricks 的社区版(非常小的集群,6GB 内存,1 个核心和 Spark 2.1),这是 link 。 (代码也在文末)

结果如下:

  • 对于 100000 大小的列表:
    • Spark SQL:1.32 秒
    • RDD 加入:0.89s
  • 对于 250000 大小的列表:
    • Spark SQL:2.2 秒
    • RDD 加入:2.0s
  • 对于 500000 大小的列表:
    • Spark SQL:3.6 秒
    • RDD 加入:4.6 秒
  • 对于 1000000 大小的列表:
    • Spark SQL:7.7 秒
    • RDD 加入:10.2 秒
  • 对于一个 10000000 大小的列表(这里我调用 timeit 只做 10 次测试,否则它会一直运行到圣诞节。当然精度会因此降低):
    • Spark SQL:57.6 秒
    • RDD 加入:89.9 秒

实际上,对于小型数据集,RDD 似乎比 Dataframes 更快,但是一旦达到阈值(大约 250k 条记录),Dataframes join 开始变得更快

现在正如@Aneel 所建议的那样,请记住我做了一个非常简单的示例,您可能想对自己的数据集和环境进行一些测试(我在我的 2 个列表中没有超过 1000 万行,因为初始化已经用了 2.6 分钟)。

初始化代码:

#Init code
NUM_TESTS=100
from random import randint
l1 = []
l2 = []

import timeit
for i in xrange(0, 10000000):
t = (randint(0,2000), randint(0,2000))
v = randint(0,2000)
l1.append((t,v))
if (randint(0,100) > 25): #at least 25% of the keys should be similar
t = (randint(0,2000), randint(0,2000))
v = randint(0,2000)
l2.append((t,v))

rdd1 = sc.parallelize(l1)
rdd2 = sc.parallelize(l2)

Spark SQL 测试:

#Test Spark SQL    
def callable_ssql_timeit():
df1 = spark.createDataFrame(rdd1).toDF("id", "val")
df1.createOrReplaceTempView("table1")
df2 = spark.createDataFrame(rdd2).toDF("id", "val")
df2.createOrReplaceTempView("table2")
query="SELECT * FROM table1 JOIN table2 ON table1.id=table2.id"
spark.sql(query).count()


print(str(timeit.timeit(callable_ssql_timeit, number=NUM_TESTS)/float(NUM_TESTS)) + "s")

RDD连接测试:

#Test RDD join
def callable_rdd_timeit():
rdd1.join(rdd2).count()
print(str(timeit.timeit(callable_rdd_timeit, number=NUM_TESTS)/float(NUM_TESTS)) + "s")

关于python - 使用来自多个 RDD 的相同键提取和保存值的最佳方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45239392/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com