gpt4 book ai didi

python - pyspark:根据另一个 RDD 的某些列过滤一个 RDD

转载 作者:行者123 更新时间:2023-11-28 17:20:14 28 4
gpt4 key购买 nike

我在 spark 集群中有两个文件,foo.csvbar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别

我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我想要 (user, category) 的键/值对:[list, of, urls]。例如:

foo.csv:
11:50:00, 111, www.google.com, search
11:50:00, 222, www.espn.com, news
11:50:00, 333, www.reddit.com, news
11:50:00, 444, www.amazon.com, store
11:50:00, 111, www.bing.com, search
11:50:00, 222, www.cnn.com, news
11:50:00, 333, www.aol.com, news
11:50:00, 444, www.jet.com, store
11:50:00, 111, www.yahoo.com, search
11:50:00, 222, www.bbc.com, news
11:50:00, 333, www.nytimes.com, news
11:50:00, 444, www.macys.com, store

bar.csv:
11:50:00, 222, www.bbc.com, news
11:50:00, 444, www.yahoo.com, store

应该导致:

{
(111, search):[www.google.com, www.bing.com, www.yahoo.com],
(333, news): [www.reddit.com, www.aol.com, www.nytimes.com]
}

换句话说,如果bar.csv 中存在一对(用户,类别),我想过滤掉foo 中的所有 行。 csv 如果它们具有完全相同的(用户,类别)对。因此,在上面的示例中,我想删除 foo.csv 中包含 (222, news)(444, store) 的所有行>。最终,在删除我想要的行之后,我想要一个包含键/值对的字典,例如:(user, category): [list, of, urls]

这是我的代码:

fooRdd = sc.textFile("file:///foo.txt/")
barRdd = sc.textFile("file:///bar.txt/")


parseFooRdd= fooRdd.map(lambda line: line.split(", "))
parseBarRdd = barRdd.map(lambda line: line.split(", "))



# (n[1] = user_id, n[3] = category_id) --> [n[2] = url]
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
barGroupRdd = parseBarRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})

上面的代码有效并以我想要的格式获取数据集:

(user_id, category): [all, urls, visited, by, user, in, that, category]

但是,有几个问题:1) 我认为它会返回一个只有一对 k/v 的字典列表,以及 2) 我不知道下一步该怎么做。我知道用英语做什么:获取 barGroupRdd(元组)中的键,并删除 fooGroupRdd 中具有相同键的所有行。但我是 pyspark 的新手,我觉得有些命令我没有利用。我认为我的代码可以优化。例如,我认为我不需要创建 barGroupRdd 行,因为我从 bar.csv 中需要的只是 (user_id, category) -- 我不需要需要创建一个字典。我还认为我应该先过滤掉,然后根据结果创建字典。感谢您提供任何帮助或建议,谢谢!

最佳答案

你真的很接近。

而不是每个 RDD:

fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]),\
n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})

这样做:

fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]),\
n[2])).groupByKey().map(lambda x: [(x[0]), list(x[1])])

这样您实际上可以使用 rdd.keys() 方法访问键并创建一个 bar_keys 列表。

bar_keys = barGroupRdd.keys().collect()

然后你就可以完全按照你说的去做了。过滤 fooGroupRdd 中具有 bar_keys 键的行。

dict(fooGroupRdd.filter(lambda x: x[0] not in bar_keys)\
.map(lambda x: [x[0], x[1]]).collect())

最终结果是这样的:

{('111', 'search'): ['www.google.com', 'www.bing.com', 'www.yahoo.com'],
('333', 'news'): ['www.reddit.com', 'www.aol.com', 'www.nytimes.com']}

希望对您有所帮助。

根据您的评论,我也想知道这是否是最有效的方法。查看 RDD 的类方法,您会发现 collectAsMap() 的工作方式类似于 collect,但返回的是字典而不是列表。但是,根据对源代码的调查,该方法与我所做的完全相同,因此这似乎是最佳选择。

关于python - pyspark:根据另一个 RDD 的某些列过滤一个 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41968372/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com