- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我将我的 spark 数据帧输出保存为带有分区的 scala 中的 csv 文件。
这就是我在 中这样做的方式齐柏林飞艇 .
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val rdd = sc.textFile("s3://trfsmallfffile/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)
val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
.filter(!$"FFAction|!|".contains("D|!|"))
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",$"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsmallfffile/FinancialLineItem/output")
val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","StatementTypeCode").count
FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
.option("rootTag", "FFFileType")
.option("rowTag", "FFPhysicalFile")
.save("s3://trfsmallfffile/FinancialLineItem/Descr")
folder/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz
Japan.1971.1.txt.gz
Japan.1971.2.txt.gz
MultipleOutputs
在工作本身中完成一些实现。如
saveAsHadoopFile 但如何做到这一点?
最佳答案
val tempOutPath = "mediamath.dir"
headerDf.union(outDf)
.repartition(1)
.write
.mode(SaveMode.Overwrite)
.format("text")
.option("codec", "gzip")
.save(tempOutPath)
import org.apache.hadoop.fs._
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val file = fs.globStatus(new Path("mediamath.dir/part*.gz"))(0).getPath.getName
fs.rename(new Path("mediamath.dir/" + file), new Path(<aws-s3-path>))
关于scala - 如何在 Spark SCALA 中重命名 AWS 中的 Spark 数据帧输出文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46703623/
我正在尝试使用谷歌浏览器的 Trace Event Profiling Tool分析我正在运行的 Node.js 应用程序。选择点样本后,我可以在三种 View 之间进行选择: 自上而下(树) 自上而
对于一个可能是菜鸟的问题,我们深表歉意,但尽管在 SO 上研究了大量教程和其他问题,但仍找不到答案。 我想做的很简单:显示一个包含大量数据库存储字符串的 Android ListView。我所说的“很
我已经开始了一个新元素的工作,并决定给 Foundation 5 一个 bash,看看它是什么样的。在创建带有水平字段的表单时,我在文档中注意到的第一件事是它们使用大量 div 来设置样式。所以我在下
我有一个 Windows 窗体用户控件,其中包含一个使用 BeginInvoke 委托(delegate)调用从单独线程更新的第 3 方图像显示控件。 在繁重的 CPU 负载下,UI 会锁定。当我附加
我有一堆严重依赖dom元素的JS代码。我目前使用的测试解决方案依赖于 Selenium ,但 AFAIK 无法正确评估 js 错误(addScript 错误不会导致您的测试失败,而 getEval 会
我正在制作一款基于滚动 2D map /图 block 的游戏。每个图 block (存储为图 block [21][11] - 每个 map 总共 231 个图 block )最多可以包含 21 个
考虑到以下情况,我是前端初学者: 某个 HTML 页面应该包含一个沉重的图像(例如 - 动画 gif),但我不想强制客户缓慢地等待它完全下载才能享受一个漂亮的页面,而是我更愿意给他看一个轻量级图像(例
我正在设计一个小软件,其中包括: 在互联网上获取资源, 一些用户交互(资源的快速编辑), 一些处理。 我想使用许多资源(它们都列在列表中)来这样做。每个都独立于其他。由于编辑部分很累,我想让用户(可能
我想比较两个理论场景。为了问题的目的,我简化了案例。但基本上它是您典型的生产者消费者场景。 (我关注的是消费者)。 我有一个很大的Queue dataQueue我必须将其传输给多个客户端。 那么让我们
我有一个二元分类问题,标签 0 和 1(少数)存在巨大不平衡。由于测试集带有标签 1 的行太少,因此我将训练测试设置为至少 70-30 或 60-40,因此仍然有重要的观察结果。由于我没有过多地衡量准
我是一名优秀的程序员,十分优秀!