gpt4 book ai didi

python - 在带有 PySpark 的单个多核机器中使用大型查找表

转载 作者:行者123 更新时间:2023-12-01 04:26:52 24 4
gpt4 key购买 nike

我有一个很大的查找表,其中包含整数作为键和字符串列表作为值。我需要这个查找表对通过 Spark 加载的数据进行一些过滤和转换。

import numpy as np
import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

conf = SparkConf()
conf.setMaster("local[20]")
conf.setAppName("analysis")
conf.set("spark.local.dir", "./tmp")
#conf.set("spark.executor.memory", "20g")
#conf.set("spark.driver.memory", "20g")
conf.set("spark.python.worker.reuse", "yes")

sc = SparkContext(conf=conf)
sqlc = SQLContext(sc)

在启动 pyspark 时,我什至使用 --driver-memory 20g 选项。

我的机器有 500 GB 内存和 27 个内核。我首先在内存中加载一个名为 lookup_tbl 的字典,它有 17457954 行。

当我尝试运行以下代码时,超过 10 分钟没有得到任何输出。等了这么久,我关闭了这个进程。我需要查找表功能。我什至尝试过使用广播功能。

sc.broadcast(lookup_tbl)
def clean_data(x, transform=lambda k: (int(k[0]), "\t".join(k[1:]))):
x = x.split('\t')
return transform(x)


def check_self(x):
from_id = x[0]
to_id = x[1]
self_ = 1
try:
common_items = set(lookup_tbl[from_id]).intersection(set(lookup_tbl[to_id]))
except KeyError:
common_items = set()
if len(common_items ) < 1:
common_items = set("-")
self_ = 0
return (((from_id, to_id, k, self_) for k in common_items ))

pair = sc.textFile("data/pair.tsv").map(lambda x: clean_data(x, transform=lambda k: (int(k[0]), int(k[1])))).flatMap(check_self)
csv_data = pair.map(lambda x: "\t".join("%s" for k in xrange(len(x))) % x)
csv_data.saveAsTextFile("out/pair_transformed")

这是 Spark 的问题还是我运行不正确?另外,我尝试为执行程序和驱动程序内存设置各种值(~20g),但没有得到任何改进。

据我了解,spark 首先尝试序列化该字典,然后再将其发送到所有本地进程。有没有办法可以从公共(public)位置使用这本字典?

最佳答案

首先要访问广播变量,您必须使用它的 value 属性:

# You can use get instead of catching KeyError
s1 = set(lookup_tbl.value.get(from_id, set()))
s2 = set(lookup_tbl.value.get(to_id, set()))
common_items = s1.intersection(s2)

为了避免广播,您可以在 mapPartitions 内部本地加载 lookup_tbl:

def check_partition(iter):
lookup_tbl = ...
for x in iter:
yield check_self

identity = lambda x: x
pair = (sc.textFile(...)
.map(lambda x: clean_data(...)
.mapPartitions(check_partition)
.flatMap(identity))

如果 lookup_tbl 相对较大,它仍然会相当昂贵。有很多方法可以处理这个问题:

  1. 使用 SQLite 连接而不是局部变量。

    import sqlite3
    conn = sqlite3.connect('path/to/lookup.db')

    c.execute("SELECT key FROM lookup WHERE id = '%s'" % from_id)
    s1 = {x[0] for x in c.fetchall()}
    c.execute("SELECT key FROM lookup WHERE id = '%s'" % to_id)
    s2 = {x[0] for x in c.fetchall()}
    common_items = s1.intersection(s2)

    它很容易设置,并且如果数据正确索引的话应该足够快

  2. 使用单个数据库服务器进行查找。 MongoDB 应该可以正常工作,并且通过正确的内存映射,您可以显着减少总体内存占用

  3. 使用加入而不是广播

    swap = lambda x: (x[1], x[0])

    def reshape1(record):
    (k1, (items, k2)) = record
    return (k2, (k1, items))

    def reshape2(record):
    (k1, (items1, (k2, items2))) = record
    return (k1, k2, set(items1) & set(items2))

    pairs = sc.textFile(...).map(lambda x: clean_data(...))

    n = ... # Number of partitions
    lookup_rdd = sc.parallelize(lookup_tbl.items()).partitionBy(n)

    lookup_rdd.join(lookup_rdd.join(pairs).map(reshape1)).map(reshape2)

关于python - 在带有 PySpark 的单个多核机器中使用大型查找表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32980908/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com