- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在hdfs中有一个 Parquet 文件作为我的数据的初始加载。接下来的所有拼花地板只是这些数据集每天都会更改为初始负载(按时间顺序)。这是我的三角洲。
我想读取全部或部分 Parquet 文件,以获取特定日期的最新数据。 Delta也可以包含新记录。
例:
初始数据(文件夹:/ path / spezific_data / 20180101):
ID| Name | Street |
1 | "Tom" |"Street 1"|
2 | "Peter"|"Street 2"|
ID| Name | Street |
1 | "Tom" |"Street 21"|
ID| Name | Street |
2 | "Peter" |"Street 44"|
3 | "Hans" | "Street 12"|
ID| Name | Street |
2 | "Hans" |"Street 55"|
initial = spark.read.parquet("hdfs:/path/spezific_data/20180101/")
initial_df = spark.read.parquet("hdfs:/path/spezific_data/20180101/") <br>
delta_df = spark.read.parquet("hdfs:/path/spezific_data/20180102/")
new_df = delta_df.union(initila_df).dropDuplicates("ID") <br>
delta_df = spark.read.parqeut("hdfs:/mypath/20180103/") <br>
new_df = delta_df.union(new_df).dropDuplicates("ID") <br>
import org.apache.spark.sql.functions.col
import util.control.Breaks._
var sourcePath = "hdfs:sourceparth/"
var destinationPath = "hdfs:destiantionpath/result"
var initial_date = "20170427"
var start_year = 2017
var end_year = 2019
var end_month = 10
var end_day = 31
var m : String = _
var d : String = _
var date : String = _
var delta_df : org.apache.spark.sql.DataFrame = _
var doubleRows_df : org.apache.spark.sql.DataFrame = _
//final DF, initial load
var final_df = spark.read.parquet(sourcePath + initial_date + "*")
breakable{
for(year <- 2017 to end_year; month <- 1 to 12; day <- 1 to 31){
//Create date String
m = month.toString()
d = day.toString()
if(month < 10)
m = "0" + m
if(day < 10)
d = "0" + d
date = year.toString() + m + d
try{
//one delta
delta_df = spark.read.parquet(sourcePath + date + "*")
//delete double Rows (i want to ignore them
doubleRows_df = delta_df.groupBy("key").count().where("count > 1").select("key")
delta_df = delta_df.join(doubleRows_df, Seq("key"), "leftanti")
//deletes all (old) rows in final_df, that are in delta_df
final_df = final_df.join(delta_df, Seq("key"), "leftanti")
//add all new rows in delta
final_df = final_df.union(delta_df)
println(date)
}catch{
case e:org.apache.spark.sql.AnalysisException=>{}
}
if(day == end_day && month == end_month && year == end_year)
break
}
}
final_df.write.mode("overwrite").parquet(destinationPath)
19/11/26 11:19:04 WARN util.Utils: Suppressing exception in finally: Java heap space
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.close(Output.java:191)
at org.apache.spark.serializer.KryoSerializationStream.close(KryoSerializer.scala:223)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$1.apply$mcV$sp(TorrentBroadcast.scala:278)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1346)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "dag-scheduler-event-loop" java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$3.apply(TorrentBroadcast.scala:271)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.allocateNewChunkIfNeeded(ChunkedByteBufferOutputStream.scala:87)
at org.apache.spark.util.io.ChunkedByteBufferOutputStream.write(ChunkedByteBufferOutputStream.scala:75)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:205)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:158)
at com.esotericsoftware.kryo.io.Output.flush(Output.java:181)
at com.esotericsoftware.kryo.io.Output.require(Output.java:160)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:246)
at com.esotericsoftware.kryo.io.Output.writeBytes(Output.java:232)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:54)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ByteArraySerializer.write(DefaultArraySerializers.java:43)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:209)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$blockifyObject$2.apply(TorrentBroadcast.scala:276)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1337)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:277)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:126)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1488)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1006)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:930)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:874)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1677)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
最佳答案
不能选择
distinct
或dropDuplicates
,因为您无法控制将采用哪些值。这很可能会发生,不会添加新值,而会保留旧值。 join
上执行ID
-查看联接here的类型。然后,联接的行应只包含旧的或仅包含新的行,或两者都包含。当只有旧的或只有新的时-您选择了现在的那个,当两者都有-则只有新时了。 val dataset = Seq(
("Thin", "cell phone", 6000),
("Normal", "tablet", 1500),
("Mini", "tablet", 5500),
("Ultra thin", "cell phone", 5000),
("Very thin", "cell phone", 6000),
("Big", "tablet", 2500),
("Bendable", "cell phone", 3000),
("Foldable", "cell phone", 3000),
("Pro", "tablet", 4500),
("Pro2", "tablet", 6500))
.toDF("product", "category", "revenue")
val overCategory = Window.partitionBy('category).orderBy('revenue.desc)
val ranked = data.withColumn("rank", dense_rank.over(overCategory))
scala> ranked.show
+----------+----------+-------+----+
| product| category|revenue|rank|
+----------+----------+-------+----+
| Pro2| tablet| 6500| 1|
| Mini| tablet| 5500| 2|
| Pro| tablet| 4500| 3|
| Big| tablet| 2500| 4|
| Normal| tablet| 1500| 5|
| Thin|cell phone| 6000| 1|
| Very thin|cell phone| 6000| 1|
|Ultra thin|cell phone| 5000| 2|
| Bendable|cell phone| 3000| 3|
| Foldable|cell phone| 3000| 3|
+----------+----------+-------+----+
scala> ranked.where('rank <= 2).show
+----------+----------+-------+----+
| product| category|revenue|rank|
+----------+----------+-------+----+
| Pro2| tablet| 6500| 1|
| Mini| tablet| 5500| 2|
| Thin|cell phone| 6000| 1|
| Very thin|cell phone| 6000| 1|
|Ultra thin|cell phone| 5000| 2|
+----------+----------+-------+----+
Date dt = new Date();
LocalDateTime.from(dt.toInstant()).plusDays(1);
StackOverflowException
。
关于dataframe - 使用数据框在Spark中处理数据差异(增量),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58991135/
我从NVIDIA手册Eg中复制了以下代码:__threadfence()。他们为什么有 在以下代码中使用了__threadfence()。我认为使用__syncthreads()而不是__thread
我在使用 SVN 更改列表和 svn diff 时遇到了一些麻烦.特别是我想获取特定修订范围的特定文件列表的更改历史记录。 SVN 变更列表似乎是完美的解决方案,所以我的方法是: svn change
我有两个 IP 地址列表。我需要将它们合并到三个文件中,交集,仅来自 list1 的文件和仅来自 list2 的文件。 我可以用 awk/diff 或任何其他简单的 unix 命令来做到这一点吗?如何
假设自上次更新(恢复)到我的 a.b 文件以来我做了一些更改。 此 a.b 文件也在存储库中更改。 现在我想将我所做的更改与 repos 更改进行比较。 如果我 svn revert 文件,我可以看到
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 我们不允许提问寻求书籍、工具、软件库等的推荐。您可以编辑问题,以便用事实和引用来回答。 关闭 7 年前。
我使用的是 openssl 1.0.1c , linux x86_64 我正在创建包含“hello”的文件(没有换行符) openssl dgst -sha256 hello_file i get :
假设我们有几个库。 有什么区别核心和 普通 图书馆?他们应该如何被认可,我们是否组织了两者的职责? +Common -Class1 +Core -Class2 +Lib1 has : Comm
如何在 SQLite 中计算以毫秒为单位的最小时间间隔? 好的,提供一些背景信息, 这是我的 table 的样子: link_budget table 所以有这个时间列,我想发出一个请求,以毫秒为单位
我想知道,乐观并发控制 (OCC) 和多版本并发控制 (MVCC) 之间的区别是什么? 到目前为止,我知道两者都是基于更新的版本检查。 在 OCC 中,我读到了没有获取读取访问锁的事务,仅适用于以后的
说到 SignalR,我有点菜鸟。刚刚开始四处探索和谷歌搜索它,我想知道是否有人可以向我解释完成的事情之间的一些差异。 在我见过的一些示例中,人们需要创建一个 Startup 类并定义 app.Map
我在 Ogre 工作,但这是一个一般的四元数问题。 我有一个对象,我最初对其应用旋转四元数 Q1。后来,我想让它看起来好像我最初通过不同的四元数 Q2 旋转了对象。 我如何计算四元数,该四元数将采用已
我了解 javascript 模块模式,但我使用两种类型的模块模式,并且想从架构 Angular 了解它们之间的区别。 // PATTERN ONE var module = (function()
我有两个具有完全相同键的 JSON。 val json1 = """{ 'name': 'Henry', 'age' : 26, 'activities' : {
我发现使用 VBA 在 Excel 中复制单个文件有两种不同的方法。一是文件复制: FileCopy (originalPath), (pathToCopyTo) 另一个是名称: Name (orig
我想知道查找两个 float 组之间差异的绝对值的最有效方法是什么? 是否是以下内容: private float absDifference(float[] vector1, float[] vec
我有一个关于 wicket getApplication 的问题。 getApplication() 和 getSession().getApplication 有什么区别? 部署 wicket 应用
我刚刚开始使用activemq,我有一个关于追溯消费者的问题,为了启用这个功能,你需要有一个持久的订阅。但是在主题上启用和不启用追溯的持久订阅有什么区别? activemq 文档说。 http://a
我有两个具有完全相同键的 JSON。 val json1 = """{ 'name': 'Henry', 'age' : 26, 'activities' : {
得到另一个 Erlang 二进制表示查询('因为这就是我最近正在阅读的内容,并且需要二进制协议(protocol)实现)。 如果我正确理解了类型说明符,那么对于“浮点”类型值,8 字节表示似乎很好(这
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 4 年前。 Improve this ques
我是一名优秀的程序员,十分优秀!