gpt4 book ai didi

dataframe - 使用数据框在Spark中处理数据差异(增量)

转载 作者:行者123 更新时间:2023-12-02 20:23:34 26 4
gpt4 key购买 nike

我在hdfs中有一个 Parquet 文件作为我的数据的初始加载。接下来的所有拼花地板只是这些数据集每天都会更改为初始负载(按时间顺序)。这是我的三角洲。
我想读取全部或部分 Parquet 文件,以获取特定日期的最新数据。 Delta也可以包含新记录。

例:

初始数据(文件夹:/ path / spezific_data / 20180101):

ID| Name  | Street    | 
1 | "Tom" |"Street 1"|
2 | "Peter"|"Street 2"|

Delta 1(文件夹:/ path / spezific_data / 20180102):
ID| Name  | Street    | 
1 | "Tom" |"Street 21"|

Delta 2(文件夹::/ path / spezific_data / 20180103):
ID| Name  | Street    | 
2 | "Peter" |"Street 44"|
3 | "Hans" | "Street 12"|

Delta 3(文件夹::/ path / spezific_data / 20180105):
ID| Name  | Street    | 
2 | "Hans" |"Street 55"|

可能有一天有增量,但一天后才加载。 (查看Delta 2和Delta 3)
因此文件夹/ p​​ath / spezific_data / 20180104确实存在,并且我们永远都不想加载该日期。
现在我想加载不同的情况。
  • 仅初始数据:
    这是目录的简单加载。
  • initial = spark.read.parquet("hdfs:/path/spezific_data/20180101/")
  • 直到一个特定的日期(20180103)
  •  initial_df = spark.read.parquet("hdfs:/path/spezific_data/20180101/") <br>
     delta_df = spark.read.parquet("hdfs:/path/spezific_data/20180102/")

    现在我必须合并(“更新”,我知道spark RDD或数据框无法进行更新)这些数据集,并推荐另一个也合并。目前,我用以下代码行解决了这个问题(但在for循环中):
     new_df = delta_df.union(initila_df).dropDuplicates("ID") <br>
    delta_df = spark.read.parqeut("hdfs:/mypath/20180103/") <br>
    new_df = delta_df.union(new_df).dropDuplicates("ID") <br>

    但是我认为这不是一个好方法。
  • 将所有数据加载到文件夹“/ path / spezific_data”中
    我这样做的第一步是使用for循环到较晚的日期

  • 问题:
    我可以这样吗?
    有更好的方法吗?
    我可以将其加载到一个df中并将其合并到其中吗?
    当前负载需要很长时间(一小时)

    更新1:
    我试图做这样的事情。如果我运行此代码,它将遍历所有日期,直到结束为止(我在println(date)上看到了此日期)。在那之后,我得到了一个Java.lang.StackOverflowError。
    错误在哪里?
    import org.apache.spark.sql.functions.col
    import util.control.Breaks._

    var sourcePath = "hdfs:sourceparth/"
    var destinationPath = "hdfs:destiantionpath/result"
    var initial_date = "20170427"
    var start_year = 2017
    var end_year = 2019
    var end_month = 10
    var end_day = 31

    var m : String = _
    var d : String = _
    var date : String = _
    var delta_df : org.apache.spark.sql.DataFrame = _
    var doubleRows_df : org.apache.spark.sql.DataFrame = _

    //final DF, initial load
    var final_df = spark.read.parquet(sourcePath + initial_date + "*")

    breakable{
    for(year <- 2017 to end_year; month <- 1 to 12; day <- 1 to 31){
    //Create date String
    m = month.toString()
    d = day.toString()
    if(month < 10)
    m = "0" + m
    if(day < 10)
    d = "0" + d
    date = year.toString() + m + d

    try{
    //one delta
    delta_df = spark.read.parquet(sourcePath + date + "*")

    //delete double Rows (i want to ignore them
    doubleRows_df = delta_df.groupBy("key").count().where("count > 1").select("key")
    delta_df = delta_df.join(doubleRows_df, Seq("key"), "leftanti")

    //deletes all (old) rows in final_df, that are in delta_df
    final_df = final_df.join(delta_df, Seq("key"), "leftanti")

    //add all new rows in delta
    final_df = final_df.union(delta_df)

    println(date)
    }catch{
    case e:org.apache.spark.sql.AnalysisException=>{}
    }
    if(day == end_day && month == end_month && year == end_year)
    break
    }
    }
    final_df.write.mode("overwrite").parquet(destinationPath)

    完整的堆栈跟踪:
    19/11/26 11:19:04 WARN util.Utils: Suppressing exception in finally: Java heap space
    java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
    at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
    at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:223)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:278)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
    at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
    at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
    at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
    at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
    at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
    at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:54)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:43)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:209)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    最佳答案

    不能选择

  • distinctdropDuplicates,因为您无法控制将采用哪些值。这很可能会发生,不会添加新值,而会保留旧值。
  • 您需要在join上执行ID-查看联接here的类型。然后,联接的行应只包含旧的或仅包含新的行,或两者都包含。当只有旧的或只有新的时-您选择了现在的那个,当两者都有-则只有新时了。

  • here的示例如何一次添加多个增量。

    问题:在每个类别中最畅销和第二畅销的产品是什么?
    val dataset = Seq(
    ("Thin", "cell phone", 6000),
    ("Normal", "tablet", 1500),
    ("Mini", "tablet", 5500),
    ("Ultra thin", "cell phone", 5000),
    ("Very thin", "cell phone", 6000),
    ("Big", "tablet", 2500),
    ("Bendable", "cell phone", 3000),
    ("Foldable", "cell phone", 3000),
    ("Pro", "tablet", 4500),
    ("Pro2", "tablet", 6500))
    .toDF("product", "category", "revenue")

    val overCategory = Window.partitionBy('category).orderBy('revenue.desc)

    val ranked = data.withColumn("rank", dense_rank.over(overCategory))

    scala> ranked.show
    +----------+----------+-------+----+
    | product| category|revenue|rank|
    +----------+----------+-------+----+
    | Pro2| tablet| 6500| 1|
    | Mini| tablet| 5500| 2|
    | Pro| tablet| 4500| 3|
    | Big| tablet| 2500| 4|
    | Normal| tablet| 1500| 5|
    | Thin|cell phone| 6000| 1|
    | Very thin|cell phone| 6000| 1|
    |Ultra thin|cell phone| 5000| 2|
    | Bendable|cell phone| 3000| 3|
    | Foldable|cell phone| 3000| 3|
    +----------+----------+-------+----+

    scala> ranked.where('rank <= 2).show
    +----------+----------+-------+----+
    | product| category|revenue|rank|
    +----------+----------+-------+----+
    | Pro2| tablet| 6500| 1|
    | Mini| tablet| 5500| 2|
    | Thin|cell phone| 6000| 1|
    | Very thin|cell phone| 6000| 1|
    |Ultra thin|cell phone| 5000| 2|
    +----------+----------+-------+----+

    更新1:

    首先,考虑使用日期实用程序,而不是手动遍历数字以获取日期:
    Date dt = new Date();
    LocalDateTime.from(dt.toInstant()).plusDays(1);

    有关更多详细信息,请参见 this

    其次-请发布完整的stacktrace,而不仅仅是 StackOverflowException

    关于dataframe - 使用数据框在Spark中处理数据差异(增量),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58991135/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com