- mongodb - 在 MongoDB mapreduce 中,如何展平值对象?
- javascript - 对象传播与 Object.assign
- html - 输入类型 ="submit"Vs 按钮标签它们可以互换吗?
- sql - 使用 MongoDB 而不是 MS SQL Server 的优缺点
我想知道 foreachPartition
与 foreach
方法相比是否会由于更高级别的并行性而产生更好的性能,考虑到我的情况'm 流经一个 RDD
以便对累加器变量执行一些求和。
最佳答案
foreach
和 foreachPartitions
是 Action 。
A generic function for invoking operations with side effects. For each element in the RDD, it invokes the passed function . This is generally used for manipulating accumulators or writing to external stores.
注意:在 foreach()
之外修改累加器以外的变量可能会导致未定义的行为。见 Understanding closures了解更多详情。
example :
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
Similar to
foreach()
, but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient thanforeach()
because it reduces the number of function calls (just likemapPartitions
() ).
foreachPartition
的使用示例:
/** * Insert in to database using foreach partition. * * @param sqlDatabaseConnectionString * @param sqlTableName */ def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { //numPartitions = number of simultaneous DB connections you can planning to givedatframe.repartition(numofpartitionsyouwant) val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // Note : Each partition one connection (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) } sqlExecutorConnection.close() // close the connection so that connections wont exhaust. } }
Usage of foreachPartition
with sparkstreaming (dstreams) and kafka producer
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}
Note : If you want to avoid this way of creating producer once per partition, betterway is to broadcast producer using
sparkContext.broadcast
since Kafka producer is asynchronous and buffers data heavily before sending.
Accumulator samples snippet to play around with it... through which you can test the performance
test("Foreach - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x)) assert(accum.value == 6L) } test("Foreach partition - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_))) assert(accum.value == 6L) }
foreachPartition
operations on partitions so obviously it would be better edge thanforeach
foreachPartition
should be used when you are accessing costly resources such as database connections or kafka producer etc.. which would initialize one per partition rather than one per element(foreach
). when it comes to accumulators you can measure the performance by above test methods, which should work faster in case of accumulators as well..
另外...见 map vs mappartitions有类似的概念,但它们是转换。
关于java - Apache Spark - foreach Vs foreachPartition 什么时候使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30484701/
spark foreachPartition,如何获取分区的索引(或者序列号,或者识别分区的东西)? val docs: RDD[String] = ... println("num partitio
我正在使用 2.3 版本的 pySpark(在我当前的开发系统中无法更新到 2.4),并且有以下关于 foreachPartition 的问题. 首先介绍一点背景信息:据我了解,pySpark-UDF
我的环境如下 Spark 1.6.1Hadoop 2.6.2 我们的需求如下(全部在Java Spark中) 1. 读取 CSV 文件并应用架构并将其转换为数据框 2.通过Spark分区SQL获取所有
我尝试在具有 8 个分区的 RDD 上使用 pyspark 来使用 forEachPartition() 方法。我的自定义函数尝试为给定的字符串输入生成字符串输出。这是代码 from google.c
我尝试在具有 8 个分区的 RDD 上使用 pyspark 来使用 forEachPartition() 方法。我的自定义函数尝试为给定的字符串输入生成字符串输出。这是代码 from google.c
我通过 spark-shell 评估了以下几行 scala 代码: val a = sc.parallelize(Array(1,2,3,4,5,6,7,8,9,10)) val b = a.coal
我们正在使用 spark 进行文件处理。我们正在处理相当大的文件,每个文件大约 30 GB,大约有 40-50 百万行。这些文件是格式化的。我们将它们加载到数据框中。最初的要求是识别符合条件的记录并将
我有一个 Spark Streaming 作业,它从 Kafka 读取数据,并在再次写入 Postrges 之前与 Postgres 中的现有表进行一些比较。这是它的样子: val message =
我有一个 DataProc 集群,其中有一个 master 和 4 个 worker。我有这个 Spark 工作: JavaRDD rdd_data = javaSparkContext.parall
我有一些这样的代码: println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass) val lastRe
我正在尝试找到一种方法来捕获 Spark 在其驱动程序的 foreachPartition() 方法内引发的异常。在数据集上使用 foreachPartition() 时,我传递了一个在各个工作线程之
我目前正在探索 Spark。我面临以下任务 - 获取一个 RDD,根据特定条件对其进行分区,然后将多个文件写入 S3 存储桶中的不同文件夹。 在我们来到上传到 S3 部分之前,一切都很好。我已经在 S
我想在 foreachPartition 中使用 SparkContext 和 SQLContext ,但由于序列化错误而无法执行。我知道这两个对象都不可序列化,但我认为 foreachPartiti
我有一个简单的 Spark 作业,用于将结果映射、计算并将结果写入 Oracle DB。我在将结果写入数据库时遇到问题。 按键减少结果后,我调用 foreachPartition 操作来建立连接并
我想知道 foreachPartition 与 foreach 方法相比是否会由于更高级别的并行性而产生更好的性能,考虑到我的情况'm 流经一个 RDD 以便对累加器变量执行一些求和。 最佳答案 fo
我正在尝试为我的数据集的每个分区拟合一个 ML 模型,但我不知道如何在 Spark 中执行此操作。 我的数据集基本上看起来像这样并且按公司划分: Company | Features | Target
我有以下使用结构化流 (Spark 2.2) 的工作代码,以便从 Kafka (0.10) 读取数据。我无法解决的唯一问题是在 ForeachWriter 中使用 kafkaProducer 时与 T
我想将每个分区的数据保存到MySQL数据库。为此,我创建了实现 VoidFunction<> 的类: public class DatabaseSaveFunction implements Void
我有一个 DataFrame,其中一列有逗号分隔的数据。 例如:数据如下所示:[{value:1}, {value:2, value:3}, {some value}, {somevalue, oth
我正在使用 Spark 读取一堆文件,对它们进行详细说明,然后将它们全部保存为序列文件。我想要的是每个分区有 1 个序列文件,所以我这样做了: SparkConf sparkConf = new Sp
我是一名优秀的程序员,十分优秀!