gpt4 book ai didi

algorithm - Spark : Counting co-occurrence - Algorithm for efficient multi-pass filtering of huge collections

转载 作者:塔克拉玛干 更新时间:2023-11-03 02:36:20 26 4
gpt4 key购买 nike

有一个表,有两列booksreaders这些书,其中booksreaders是图书和读者 ID,分别为:

   books readers
1: 1 30
2: 2 10
3: 3 20
4: 1 20
5: 1 10
6: 2 30

Record book = 1, reader = 30 表示 id = 1 的书被 id = 30 的用户阅读。对于每一对书籍,我需要使用以下算法计算阅读这两本书的读者数量:

for each book
for each reader of the book
for each other_book in books of the reader
increment common_reader_count ((book, other_book), cnt)

使用此算法的优势在于,与按两本计算所有书籍组合相比,它需要少量操作

为了实现上述算法,我将这些数据分为两组:1) 按书键,一个包含每本书读者的 RDD 和 2) 按读者键,一个包含每个读者阅读的书的 RDD,如下所示程序:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level

object Small {

case class Book(book: Int, reader: Int)
case class BookPair(book1: Int, book2: Int, cnt:Int)

val recs = Array(
Book(book = 1, reader = 30),
Book(book = 2, reader = 10),
Book(book = 3, reader = 20),
Book(book = 1, reader = 20),
Book(book = 1, reader = 10),
Book(book = 2, reader = 30))

def main(args: Array[String]) {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
// set up environment
val conf = new SparkConf()
.setAppName("Test")
.set("spark.executor.memory", "2g")
val sc = new SparkContext(conf)
val data = sc.parallelize(recs)

val bookMap = data.map(r => (r.book, r))
val bookGrps = bookMap.groupByKey

val readerMap = data.map(r => (r.reader, r))
val readerGrps = readerMap.groupByKey

// *** Calculate book pairs
// Iterate book groups
val allBookPairs = bookGrps.map(bookGrp => bookGrp match {
case (book, recIter) =>
// Iterate user groups
recIter.toList.map(rec => {
// Find readers for this book
val aReader = rec.reader
// Find all books (including this one) that this reader read
val allReaderBooks = readerGrps.filter(readerGrp => readerGrp match {
case (reader2, recIter2) => reader2 == aReader
})
val bookPairs = allReaderBooks.map(readerTuple => readerTuple match {
case (reader3, recIter3) => recIter3.toList.map(rec => ((book, rec.book), 1))
})
bookPairs
})

})
val x = allBookPairs.flatMap(identity)
val y = x.map(rdd => rdd.first)
val z = y.flatMap(identity)
val p = z.reduceByKey((cnt1, cnt2) => cnt1 + cnt2)
val result = p.map(bookPair => bookPair match {
case((book1, book2),cnt) => BookPair(book1, book2, cnt)
} )

val resultCsv = result.map(pair => resultToStr(pair))
resultCsv.saveAsTextFile("./result.csv")
}

def resultToStr(pair: BookPair): String = {
val sep = "|"
pair.book1 + sep + pair.book2 + sep + pair.cnt
}
}

这种实现实际上导致了不同的、低效的算法!:

for each book
find each reader of the book scanning all readers every time!
for each other_book in books of the reader
increment common_reader_count ((book, other_book), cnt)

这与上述算法的主要目标相矛盾,因为它没有减少,而是增加了操作次数。查找用户书籍需要过滤每本书的所有用户。因此操作数 ~ N * M 其中 N - 用户数和 M - 图书数。

问题:

  1. 有什么方法可以在 Spark 中实现原始算法,而无需为每本书过滤完整的读者集合?
  2. 还有其他算法可以有效地计算图书对数吗?
  3. 此外,当实际运行这段代码时,我得到了过滤器异常,我不知道这是什么原因。有任何想法吗?

请查看下面的异常日志:

15/05/29 18:24:05 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface eth0)
15/05/29 18:24:05 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
15/05/29 18:24:09 INFO slf4j.Slf4jLogger: Slf4jLogger started
15/05/29 18:24:10 INFO Remoting: Starting remoting
15/05/29 18:24:10 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.2.15:38910]
15/05/29 18:24:10 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@10.0.2.15:38910]
15/05/29 18:24:12 ERROR executor.Executor: Exception in task 0.0 in stage 6.0 (TID 4)
java.lang.NullPointerException
at org.apache.spark.rdd.RDD.filter(RDD.scala:282)
at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:58)
at Small$$anonfun$4$$anonfun$apply$1.apply(Small.scala:54)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at Small$$anonfun$4.apply(Small.scala:54)
at Small$$anonfun$4.apply(Small.scala:51)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
at org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:55)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:54)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)

更新:

这段代码:

val df = sc.parallelize(Array((1,30),(2,10),(3,20),(1,10)(2,30))).toDF("books","readers")
val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"),
$"readers" === $"r_readers" and $"books" < $"r_books"
)
.groupBy($"books", $"r_books")
.agg($"books", $"r_books", count($"readers"))

给出以下结果:

books r_books COUNT(readers)
1 2 2

所以 COUNT 这里是两本书(这里是 1 和 2)被一起阅读的次数(对数)。

最佳答案

如果将原始 RDD 转换为 DataFrame,这种事情就容易多了:

val df = sc.parallelize(
Array((1,30),(2,10),(3,20),(1,10), (2,30))
).toDF("books","readers")

一旦你这样做了,只需在 DataFrame 上做一个自连接来制作书籍对,然后计算有多少读者阅读了每对书籍:

val results = df.join(
df.select($"books" as "r_books", $"readers" as "r_readers"),
$"readers" === $"r_readers" and $"books" < $"r_books"
).groupBy(
$"books", $"r_books"
).agg(
$"books", $"r_books", count($"readers")
)

至于关于该加入的额外解释,请注意我加入 df回到自身——自连接:df.join(df.select(...), ...) .您要做的是将第 1 本书拼在一起 -- $"books" -- 与第二本书 -- $"r_books" , 来自同一个读者 -- $"reader" === $"r_reader" .但是如果你只加入 $"reader" === $"r_reader" , 你会把同一本书重新连接起来。相反,我使用 $"books" < $"r_books"确保书对中的顺序始终为 (<lower_id>,<higher_id>) .

完成连接后,您将获得一个 DataFrame,其中包含每对图书的每位读者的一行。 groupByagg函数对每对图书的读者数量进行实际计数。

顺便说一句,如果读者阅读同一本书两次,我相信您最终会重复计算,这可能是您想要的,也可能不是您想要的。如果这不是您想要的,只需更改 count($"readers")countDistinct($"readers") .

如果您想了解更多关于 agg 的信息函数 count()countDistinct()和一堆其他有趣的东西,查看 org.apache.spark.sql.functions 的 scaladoc

关于algorithm - Spark : Counting co-occurrence - Algorithm for efficient multi-pass filtering of huge collections,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30534068/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com