gpt4 book ai didi

scala - 在 Spark 中生成大量随机数据的有效方法

转载 作者:行者123 更新时间:2023-12-03 23:52:45 33 4
gpt4 key购买 nike

我正在尝试生成一个大型随机数据集 Spark 。我基本上想从 2018-12-01 09:00:00 开始对于每个新行,时间戳将更改 scala.util.Random.nextInt(3)秒。 (timestamp 列是唯一有意义的列)

我希望即使当我尝试在大型集群上生成数万亿行时它仍然有效,所以我试图一次以 100 个元素为一组生成它,因为数万亿行无法放入 Seq .

此代码存在一些问题,例如 var我不确定我对 union 的使用.我想知道是否有人对如何做到这一点有更好的想法。

import Math.{max, min}
import java.sql.Timestamp
import java.sql.Timestamp.valueOf

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataGenerator extends SparkEnv {

import spark.implicits._

val batchSize = 100
val rnd = scala.util.Random

// randomly generates a DataFrame with n Rows
def generateTimestampData(n: Int): DataFrame = {
val timestampDataFields = Seq(StructField("timestamp", TimestampType, false))
val initDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(timestampDataFields))
def loop(data: DataFrame, lastTime: Long, _n: Int): DataFrame = {
if (_n == 0) {
val w = Window.orderBy("timestamp")
data.withColumn("eventID", concat(typedLit("event"), row_number().over(w)))
} else {
var thisTime = lastTime
def rts(ts: Long): Stream[Long] = ts #:: { thisTime = ts + rnd.nextInt(3) * 1000; rts(thisTime) }
val thisBatch = rts(lastTime)
.map(new Timestamp(_))
.take(min(batchSize, _n))
.toDF("timestamp")
loop(data union thisBatch, thisTime, max(_n - batchSize, 0))
}
}
loop(initDF, valueOf("2018-12-01 09:00:00").getTime(), n)
}

def main(args: Array[String]): Unit = {
val w = Window.orderBy("timestamp")
val df = generateTimestampData(10015)
.withColumn("part", floor(row_number().over(w) / 100))
df.repartition(27)
.write
.partitionBy("part")
.option("compression", "snappy")
.mode(SaveMode.Overwrite)
.parquet("data/generated/ts_data")
}

}

上面的代码生成了一个包含 10,015 行的 DataFrame,看起来像这样。
+-------------------+----------+----+
| timestamp| eventID|part|
+-------------------+----------+----+
|2018-12-01 11:43:09|event10009| 100|
|2018-12-01 11:43:02|event10003| 100|
|2018-12-01 11:43:11|event10012| 100|
|2018-12-01 11:43:10|event10011| 100|
|2018-12-01 11:43:08|event10007| 100|
|2018-12-01 11:43:02|event10001| 100|
|2018-12-01 11:43:08|event10008| 100|
|2018-12-01 11:43:12|event10013| 100|
|2018-12-01 11:43:09|event10010| 100|
|2018-12-01 11:43:14|event10014| 100|
|2018-12-01 10:11:54| event4357| 43|
|2018-12-01 10:47:33| event6524| 65|
|2018-12-01 10:23:08| event5064| 50|
|2018-12-01 10:23:02| event5060| 50|
|2018-12-01 10:23:39| event5099| 50|
|2018-12-01 10:22:25| event5019| 50|
|2018-12-01 09:16:36| event1042| 10|
|2018-12-01 09:16:03| event1008| 10|
|2018-12-01 09:16:13| event1017| 10|
|2018-12-01 09:17:28| event1092| 10|
+-------------------+----------+----+

最佳答案

您可以实现一个并行执行随机数据生成的 RDD,如下例所示。

import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD

// Each random partition will hold `numValues` items
final class RandomPartition[A: ClassTag](val index: Int, numValues: Int, random: => A) extends Partition {
def values: Iterator[A] = Iterator.fill(numValues)(random)
}

// The RDD will parallelize the workload across `numSlices`
final class RandomRDD[A: ClassTag](@transient private val sc: SparkContext, numSlices: Int, numValues: Int, random: => A) extends RDD[A](sc, deps = Seq.empty) {

// Based on the item and executor count, determine how many values are
// computed in each executor. Distribute the rest evenly (if any).
private val valuesPerSlice = numValues / numSlices
private val slicesWithExtraItem = numValues % numSlices

// Just ask the partition for the data
override def compute(split: Partition, context: TaskContext): Iterator[A] =
split.asInstanceOf[RandomPartition[A]].values

// Generate the partitions so that the load is as evenly spread as possible
// e.g. 10 partition and 22 items -> 2 slices with 3 items and 8 slices with 2
override protected def getPartitions: Array[Partition] =
((0 until slicesWithExtraItem).view.map(new RandomPartition[A](_, valuesPerSlice + 1, random)) ++
(slicesWithExtraItem until numSlices).view.map(new RandomPartition[A](_, valuesPerSlice, random))).toArray

}

一旦你有了它,你就可以使用它传递你自己的随机数据生成器来获得 RDD[Int]
val rdd = new RandomRDD(spark.sparkContext, 10, 22, scala.util.Random.nextInt(100) + 1)
rdd.foreach(println)
/*
* outputs:
* 30
* 86
* 75
* 20
* ...
*/

RDD[(Int, Int, Int)]
def rand = scala.util.Random.nextInt(100) + 1
val rdd = new RandomRDD(spark.sparkContext, 10, 22, (rand, rand, rand))
rdd.foreach(println)
/*
* outputs:
* (33,22,15)
* (65,24,64)
* (41,81,44)
* (58,7,18)
* ...
*/

当然,您可以将其包装在 DataFrame 中也很容易:
spark.createDataFrame(rdd).show()
/*
* outputs:
* +---+---+---+
* | _1| _2| _3|
* +---+---+---+
* |100| 48| 92|
* | 34| 40| 30|
* | 98| 63| 61|
* | 95| 17| 63|
* | 68| 31| 34|
* .............
*/

注意在这种情况下,每次 RDD 生成的数据是如何不同的。/ DataFrame被执行。通过改变 RandomPartition 的实现要实际存储值而不是即时生成它们,您可以拥有一组稳定的随机项,同时仍保留此方法的灵活性和可扩展性。

无状态方法的一个很好的特性是您甚至可以在本地生成巨大的数据集。以下几秒钟在我的笔记本电脑上运行:
new RandomRDD(spark.sparkContext, 10, Int.MaxValue, 42).count
// returns: 2147483647

关于scala - 在 Spark 中生成大量随机数据的有效方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55083170/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com