- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试将存储在 S3 中的数据作为 JSON-per-line 文本文件转换为结构化的列格式,如 S3 上的 ORC 或 Parquet。
源文件包含多种方案的数据(例如 HTTP 请求、HTTP 响应等),需要将其解析为正确类型的不同 Spark 数据帧。
示例模式:
val Request = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("requestMethod", StringType),
StructField("scheme", StringType),
StructField("host", StringType),
StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
StructField("path", StringType),
StructField("sessionId", StringType),
StructField("userAgent", StringType)
))
val Response = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("contentType", StringType),
StructField("contentLength", IntegerType),
StructField("statusCode", StringType),
StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
StructField("responseDuration", DoubleType),
StructField("sessionId", StringType)
))
val allSchemes = Schemes.all().keys.toArray
if (false) {
import com.realo.warehouse.multiplex.implicits._
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.flatMuxPartitions(allSchemes.length, data => {
val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
data.foreach {
logItem => {
val schemeIndex = allSchemes.indexOf(logItem.logType)
if (schemeIndex > -1) {
buffers(schemeIndex).append(logItem.row)
}
}
}
buffers
})
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input(index)
writeColumnarToS3(rdd, schemeName)
}
} else if (false) {
// Naive approach
val input = readRawFromS3(inputPrefix) // returns RDD[Row]
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.foreach {
schemeName =>
val rdd = input
.filter(x => x.logType == schemeName)
.map(x => x.row)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
} else {
class CustomPartitioner extends Partitioner {
override def numPartitions: Int = allSchemes.length
override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
}
val input = readRawFromS3(inputPrefix)
.map(x => (x.logType, x.row))
.partitionBy(new CustomPartitioner())
.map { case (logType, row) => row }
.persist(StorageLevel.MEMORY_AND_DISK)
allSchemes.zipWithIndex.foreach {
case (schemeName, index) =>
val rdd = input
.mapPartitionsWithIndex(
(i, iter) => if (i == index) iter else Iterator.empty,
preservesPartitioning = true
)
writeColumnarToS3(rdd, schemeName)
}
input.unpersist()
}
最佳答案
鉴于输入是一个 json,您可以将其读入一个字符串数据帧(每行是一个字符串)。然后,您可以从每个 json 中提取类型(通过使用 UDF 或使用诸如 get_json_object 或 json_tuple 之类的函数)。
现在您有两列:类型和原始 json。您现在可以在写入数据帧时使用 partitionBy 数据帧选项。这将为每种类型生成一个目录,该目录的内容将包括原始 jsons。
现在,您可以使用自己的架构读取每种类型。
您还可以使用映射对 RDD 执行类似的操作,该映射将输入 rdd 转换为一对 rdd,键是类型,值是转换为目标模式的 json。然后您可以使用 partitionBy 和 map partition 将每个分区保存到一个文件中,或者您可以使用 reduce by key 写入不同的文件(例如,通过使用 key 设置文件名)。
你也可以看看Write to multiple outputs by key Spark - one Spark job
请注意,我在这里假设目标是拆分为文件。根据您的特定用例,其他选项可能是可行的。例如,如果您的不同模式足够接近,您可以创建一个包含所有模式的 super 模式,并直接从中创建数据帧。然后,您可以直接处理数据帧,也可以使用数据帧 partitionBy 将不同的子类型写入不同的目录(但这次已保存到 Parquet )。
关于apache-spark - 将 RDD 解复用到多个 ORC 表上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41148981/
我是 Pyspark 新手,我使用的是 Spark 2.0.2。 我有一个名为 Test_RDD 的 RDD,其结构如下: U-Key || V1 || V2 || V3 || ----
我正在寻找一种方法将一个 RDD 拆分为两个或多个 RDD,并将获得的结果保存为两个单独的 RDD。例如: rdd_test = sc.parallelize(range(50), 1) 我的代码:
我有一个结构如下的RDD: ((user_id,item_id,rating)) 让我们将此 RDD 称为训练 然后还有另一个具有相同结构的rdd: ((user_id,item_id,rating)
已经有人问过类似的问题。最相似的是这个: Spark: How to split an RDD[T]` into Seq[RDD[T]] and preserve the ordering 但是,我不
我正在使用 spark 来处理数据。但是我不知道如何将新数据保存到Hive 我从 Hive 加载 rdd,然后运行 map 函数来清理数据。 result = myRdd.map(lambda x
我有一个名为 index 的 rdd:RDD[(String, String)],我想用 index 来处理我的文件。 这是代码: val get = file.map({x => val tmp
我有两个 RDD: **rdd1** id1 val1 id2 val2 **rdd2** id1 v1 id2 v2 id1 v3 id8 v7 id1 v4 id3 v5 id6 v6 我想过滤
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD,需要从另一个 RDD 访问数据。但是,我总是收到任务不可序列化错误。我已经扩展了 Serialized 类,但它没有起作用。代码是: val oldError = rddOfRati
我有一个 RDD 对: (105,918) (105,757) (502,516) (105,137) (516,816) (350,502) 我想将它分成两个 RDD,这样第一个只有具有非重复值的对
我正在尝试使用 spark 执行 K 最近邻搜索。 我有一个 RDD[Seq[Double]] 并且我打算返回一个 RDD[(Seq[Double],Seq[Seq[Double]])] 带有实际行和
我是Spark和Scala语言的新手,并且希望将所有RDD合并到一个List中,如下所示(List to RDD): val data = for (item {
我找不到只参与 rdd 的方法. take看起来很有希望,但它返回 list而不是 rdd .我当然可以将其转换为 rdd ,但这似乎既浪费又丑陋。 my_rdd = sc.textFile("my
我正在寻找一种将 RDD 拆分为两个或更多 RDD 的方法。我见过的最接近的是 Scala Spark: Split collection into several RDD?这仍然是一个单一的 RDD
我有一个RDD[String],wordRDD。我还有一个从字符串/单词创建 RDD[String] 的函数。我想为 wordRDD 中的每个字符串创建一个新的 RDD。以下是我的尝试: 1) 失败,
我刚刚开始使用 Spark 和 Scala 我有一个包含多个文件的目录我使用 成功加载它们 sc.wholeTextFiles(directory) 现在我想升一级。我实际上有一个目录,其中包含包含文
我想从另一个 RDD 中减去一个 RDD。我查看了文档,发现 subtract可以这样做。实际上,当我测试时 subtract , 最终的 RDD 保持不变,值不会被删除! 有没有其他功能可以做到这一
我在 HDFS 中有如下三个文件中的数据 EmployeeManagers.txt (EmpID,ManagerID) 1,5 2,4 3,4 4,6 5,6 EmployeeNames.txt (E
我正在开发一个应用程序,我需要对 RDD 中具有相同键的每对行执行计算,这是 RDD 结构: List>> dat2 = new ArrayList<>(); dat2.add(new Tuple2>
我在 spark 集群中有两个文件,foo.csv 和 bar.csv,它们都有 4 列和完全相同的字段:时间、用户、url、类别。 我想通过 bar.csv 的某些列过滤掉 foo.csv。最后,我
我是一名优秀的程序员,十分优秀!