- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
对于Spark sql,我们应该如何通过覆盖保存模式从HDFS中的一个文件夹中获取数据,进行一些修改,并将更新的数据保存到HDFS中的同一文件夹中,而不会出现FileNotFoundException?
import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf
val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works
Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
... 8 more
val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20"
val df= sparkSession.read.parquet(hdfsDirPath)
val newdf = df
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")
sparkSession.sql("TRUNCATE TABLE orgtable")
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable")
val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")
sparkSession.sql("REFRESH TABLE orgtable")
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable")
val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)
最佳答案
我解决了这个问题,首先将Dataframe写入临时目录,然后删除读取的源,并将临时目录重命名为源名称。质量检查
关于apache-spark - Spark SQL SaveMode.Overwrite,获取java.io.FileNotFoundException并需要“REFRESH TABLE tableName”,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42920748/
抱歉这个菜鸟问题,但请有人向我解释一下这个嵌套 for 循环的内部工作原理。 v,w = 2,4 for v in range(v): for w in range(w): pr
我正在用 C++ 制作 OpenGL 游戏。与其他语言相比,我在 C++ 方面相当缺乏经验。无论如何,我为一些图像创建了一个带有“基本”目录的字符串流。然后,我将此字符串流作为函数参数传递给构造函数。
我正在用 Java 进行基本的文件处理。我想要的是,一旦我运行我的代码,就会在指定位置创建 .txt 文件,并在那里写入一些文本,现在下次当我写东西时,它不应该 OVERWRITE 它,但应该在它之前
我正在用 C++ 制作 OpenGL 游戏。与其他语言相比,我在 C++ 方面相当缺乏经验。无论如何,我为一些图像创建了一个带有“基本”目录的字符串流。然后,我将此字符串流作为函数参数传递给构造函数。
通常当我们setState时,我们想要将一些新状态与旧状态合并。我有一个用例,我想覆盖旧状态。有什么干净的方法可以做到这一点吗? 例如,假设我当前的状态有 561 个不同的属性。我希望新状态具有 3
我正在使用 Flutter-FFmpeg package 在我的 Flutter App 上实现视频编辑功能, [在视频上添加水印] 具体来说,在执行代码时出现此错误: E/mobile-ffmpeg
我的主要需求是使用INSERT OVERWRITE uery从HIVE加载HDFS中的文件。在此查询中,我尝试连接3个配置单元表。下面是查询: select * FROM AGG_CUSTOM
我无法让 symfony2 配置正确覆盖其他配置文件中的值。问题是这样的: 我有一个新的环境“staging”,我想在其中使用 config_prod.yml 中的大部分内容,但有另一个日志记录级别(
背景: 我使用的是 python 2.7.4 我正在类里面逐行阅读文档 我想在一个类中有两个变量 一个变量(我将其设置为一个名为lines的数组)我想要拥有文档的所有原始格式 另一个变量(我也做成了一
关闭。此题需要details or clarity 。目前不接受答案。 想要改进这个问题吗?通过 editing this post 添加详细信息并澄清问题. 已关闭 9 年前。 Improve th
前提:作业中所需的功能之一是对链接列表进行排序。我的做法可能效率很低,但这是我知道的唯一方法。 问题:如果我有一个信息链接列表,我将如何(在函数内)“覆盖”传入链接列表的信息。 代码: void so
希望这是我今天的最后一个问题... O.o 所以我用用户的输入填充了一个表: var textField = Ti.UI.createTextField({ hintText:"Zoeken over
我尝试使用以下 MySQL 查询来“覆盖”/替换表中的空/错误条目,但它不起作用。我收到错误: Error Code: 1054. Unknown column 'T2.Produktbezeichn
我在替换字符串的一部分时遇到问题。现在这段代码。我的目标是针对包含此字典中的键的每个字符串。 mapping = { "St": "Street", "St.": "Stree
find /home/csuser/testfiles . -type f -name "*.c" -exec mv '{}' /home/csuser/src/ \; 上面的命令将简单地覆盖具有相同
我有一个分支,里面有一些未跟踪的文件。我想将另一个分支 merge 到它上面。 第二个分支包含一个跟踪文件,第一个分支未跟踪该文件。出现以下错误: error: The following untra
如何通过cloudformation模板在codedeploy中添加“覆盖内容” 我的模板很好并且正在运行,但是当我尝试部署新版本时,它给了我不同的错误。 如下图 部署失败,因为此位置已存在指定文件。
我想覆盖ProductController来自EnrichBundle . 在开发模式下一切正常,但是当我想使用 php app/console pim:install --env=prod --fo
嘿伙计们,我不是一个铁杆编码员,所以我在这里不了解这些基础知识。 假设我的页面上有多个 '(包含通过 swfobject 的 Youtube 视频)。所有这些对象都有一个唯一的 ID,如 ytplay
我在 PHPStorm 的设置中创建了一个新的 Deployment,可以自动上传。工作得很好,我只是想知道在哪里可以让 PHPStorm 检查服务器上的文件是否更新,以及何时两个人正在处理同一个文件
我是一名优秀的程序员,十分优秀!