gpt4 book ai didi

java - Apache Spark - foreach Vs foreachPartition 什么时候使用?

转载 作者:IT老高 更新时间:2023-10-28 20:55:13 34 4
gpt4 key购买 nike

我想知道 foreachPartitionforeach 方法相比是否会由于更高级别的并行性而产生更好的性能,考虑到我的情况'm 流经一个 RDD 以便对累加器变量执行一些求和。

最佳答案

foreachforeachPartitions 是 Action 。

foreach(function): 单位

A generic function for invoking operations with side effects. For each element in the RDD, it invokes the passed function . This is generally used for manipulating accumulators or writing to external stores.

注意:在 foreach() 之外修改累加器以外的变量可能会导致未定义的行为。见 Understanding closures了解更多详情。

example :

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

foreachPartition(函数): 单位

Similar to foreach() , but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient than foreach() because it reduces the number of function calls (just like mapPartitions() ).

foreachPartition 的使用示例:


  • 示例 1:对于您要使用的每个分区一个数据库连接(每个分区 block 的内部),那么这是一个使用 scala 的示例用法。
/**    * Insert in to database using foreach partition.    *    * @param sqlDatabaseConnectionString    * @param sqlTableName    */  def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = {    //numPartitions = number of simultaneous DB connections you can planning to givedatframe.repartition(numofpartitionsyouwant)    val tableHeader: String = dataFrame.columns.mkString(",")    dataFrame.foreachPartition { partition =>      // Note : Each partition one connection (more better way is to use connection pools)      val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString)      //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql      partition.grouped(1000).foreach {        group =>          val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder()          group.foreach {            record => insertString.append("('" + record.mkString(",") + "'),")          }          sqlExecutorConnection.createStatement()            .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES "              + insertString.stripSuffix(","))      }      sqlExecutorConnection.close() // close the connection so that connections wont exhaust.    }  }
  • Example2 :

Usage of foreachPartition with sparkstreaming (dstreams) and kafka producer

dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// only once per partition You can safely share a thread-safe Kafka //producer instance.
val producer = createKafkaProducer()
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}

Note : If you want to avoid this way of creating producer once per partition, betterway is to broadcast producer using sparkContext.broadcast since Kafka producer is asynchronous and buffers data heavily before sending.


Accumulator samples snippet to play around with it... through which you can test the performance

     test("Foreach - Spark") {        import spark.implicits._        var accum = sc.longAccumulator        sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x))        assert(accum.value == 6L)      }      test("Foreach partition - Spark") {        import spark.implicits._        var accum = sc.longAccumulator        sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_)))        assert(accum.value == 6L)      }

结论:

foreachPartition operations on partitions so obviously it would be better edge than foreach

经验法则:

foreachPartition should be used when you are accessing costly resources such as database connections or kafka producer etc.. which would initialize one per partition rather than one per element(foreach). when it comes to accumulators you can measure the performance by above test methods, which should work faster in case of accumulators as well..

另外...见 map vs mappartitions有类似的概念,但它们是转换。

关于java - Apache Spark - foreach Vs foreachPartition 什么时候使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30484701/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com