- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 spark 集群中有两个文件,foo.csv
和 bar.csv
,它们都有 4 列和完全相同的字段:时间、用户、url、类别
。
我想通过 bar.csv
的某些列过滤掉 foo.csv
。最后,我想要 (user, category) 的键/值对:[list, of, urls]。例如:
foo.csv:
11:50:00, 111, www.google.com, search
11:50:00, 222, www.espn.com, news
11:50:00, 333, www.reddit.com, news
11:50:00, 444, www.amazon.com, store
11:50:00, 111, www.bing.com, search
11:50:00, 222, www.cnn.com, news
11:50:00, 333, www.aol.com, news
11:50:00, 444, www.jet.com, store
11:50:00, 111, www.yahoo.com, search
11:50:00, 222, www.bbc.com, news
11:50:00, 333, www.nytimes.com, news
11:50:00, 444, www.macys.com, store
bar.csv:
11:50:00, 222, www.bbc.com, news
11:50:00, 444, www.yahoo.com, store
应该导致:
{
(111, search):[www.google.com, www.bing.com, www.yahoo.com],
(333, news): [www.reddit.com, www.aol.com, www.nytimes.com]
}
换句话说,如果bar.csv
中存在一对(用户,类别),我想过滤掉foo 中的所有 行。 csv
如果它们具有完全相同的(用户,类别)对。因此,在上面的示例中,我想删除 foo.csv
中包含 (222, news)
和 (444, store)
的所有行>。最终,在删除我想要的行之后,我想要一个包含键/值对的字典,例如:(user, category): [list, of, urls]
。
这是我的代码:
fooRdd = sc.textFile("file:///foo.txt/")
barRdd = sc.textFile("file:///bar.txt/")
parseFooRdd= fooRdd.map(lambda line: line.split(", "))
parseBarRdd = barRdd.map(lambda line: line.split(", "))
# (n[1] = user_id, n[3] = category_id) --> [n[2] = url]
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
barGroupRdd = parseBarRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
上面的代码有效并以我想要的格式获取数据集:
(user_id, category): [all, urls, visited, by, user, in, that, category]
但是,有几个问题:1) 我认为它会返回一个只有一对 k/v 的字典列表,以及 2) 我不知道下一步该怎么做。我知道用英语做什么:获取 barGroupRdd
(元组)中的键,并删除 fooGroupRdd 中具有相同键的所有行。但我是 pyspark 的新手,我觉得有些命令我没有利用。我认为我的代码可以优化。例如,我认为我不需要创建 barGroupRdd
行,因为我从 bar.csv
中需要的只是 (user_id, category) -- 我不需要需要创建一个字典。我还认为我应该先过滤掉,然后根据结果创建字典。感谢您提供任何帮助或建议,谢谢!
最佳答案
你真的很接近。
而不是每个 RDD:
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]),\
n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
这样做:
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]),\
n[2])).groupByKey().map(lambda x: [(x[0]), list(x[1])])
这样您实际上可以使用 rdd.keys() 方法访问键并创建一个 bar_keys
列表。
bar_keys = barGroupRdd.keys().collect()
然后你就可以完全按照你说的去做了。过滤 fooGroupRdd 中具有 bar_keys 键的行。
dict(fooGroupRdd.filter(lambda x: x[0] not in bar_keys)\
.map(lambda x: [x[0], x[1]]).collect())
最终结果是这样的:
{('111', 'search'): ['www.google.com', 'www.bing.com', 'www.yahoo.com'],
('333', 'news'): ['www.reddit.com', 'www.aol.com', 'www.nytimes.com']}
希望对您有所帮助。
根据您的评论,我也想知道这是否是最有效的方法。查看 RDD 的类方法,您会发现 collectAsMap()
的工作方式类似于 collect,但返回的是字典而不是列表。但是,根据对源代码的调查,该方法与我所做的完全相同,因此这似乎是最佳选择。
关于python - pyspark:根据另一个 RDD 的某些列过滤一个 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41968372/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!