gpt4 book ai didi

apache-spark - 将 RDD 解复用到多个 ORC 表上

转载 作者:行者123 更新时间:2023-12-04 16:06:17 25 4
gpt4 key购买 nike

我正在尝试将存储在 S3 中的数据作为 JSON-per-line 文本文件转换为结构化的列格式,如 S3 上的 ORC 或 Parquet。

源文件包含多种方案的数据(例如 HTTP 请求、HTTP 响应等),需要将其解析为正确类型的不同 Spark 数据帧。

示例模式:

  val Request = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("requestMethod", StringType),
StructField("scheme", StringType),
StructField("host", StringType),
StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
StructField("path", StringType),
StructField("sessionId", StringType),
StructField("userAgent", StringType)
))

val Response = StructType(Seq(
StructField("timestamp", TimestampType, nullable=false),
StructField("requestId", LongType),
StructField("contentType", StringType),
StructField("contentLength", IntegerType),
StructField("statusCode", StringType),
StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
StructField("responseDuration", DoubleType),
StructField("sessionId", StringType)
))

我让那部分工作正常,但是尝试尽可能有效地将数据写回 S3 似乎是一个问题 atm。

我尝试了 3 种方法:
  • muxPartitions来自silex项目
  • 缓存已解析的 S3 输入并对其进行多次循环
  • 使每个方案类型成为 RDD 的单独分区

  • 在第一种情况下,JVM 内存不足,而在第二种情况下,机器磁盘空间不足。

    第三个我还没有彻底测试过,但这似乎不是对处理能力的有效利用(因为集群中只有一个节点(这个特定分区所在的节点)实际上会将数据写回 S3) .

    相关代码:

    val allSchemes = Schemes.all().keys.toArray

    if (false) {
    import com.realo.warehouse.multiplex.implicits._

    val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .flatMuxPartitions(allSchemes.length, data => {
    val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
    data.foreach {
    logItem => {
    val schemeIndex = allSchemes.indexOf(logItem.logType)
    if (schemeIndex > -1) {
    buffers(schemeIndex).append(logItem.row)
    }
    }
    }
    buffers
    })

    allSchemes.zipWithIndex.foreach {
    case (schemeName, index) =>
    val rdd = input(index)

    writeColumnarToS3(rdd, schemeName)
    }
    } else if (false) {
    // Naive approach
    val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .persist(StorageLevel.MEMORY_AND_DISK)

    allSchemes.foreach {
    schemeName =>
    val rdd = input
    .filter(x => x.logType == schemeName)
    .map(x => x.row)

    writeColumnarToS3(rdd, schemeName)
    }

    input.unpersist()
    } else {
    class CustomPartitioner extends Partitioner {
    override def numPartitions: Int = allSchemes.length
    override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
    }

    val input = readRawFromS3(inputPrefix)
    .map(x => (x.logType, x.row))
    .partitionBy(new CustomPartitioner())
    .map { case (logType, row) => row }
    .persist(StorageLevel.MEMORY_AND_DISK)

    allSchemes.zipWithIndex.foreach {
    case (schemeName, index) =>
    val rdd = input
    .mapPartitionsWithIndex(
    (i, iter) => if (i == index) iter else Iterator.empty,
    preservesPartitioning = true
    )

    writeColumnarToS3(rdd, schemeName)
    }

    input.unpersist()
    }

    从概念上讲,我认为每个方案类型的代码应该有 1 个输出 DStream,并且输入 RDD 应该选择将每个处理过的项目放到正确的 DStream 上(通过批处理以获得更好的吞吐量)。

    有没有人对如何实现这一点有任何指示?和/或是否有更好的方法来解决这个问题?

    最佳答案

    鉴于输入是一个 json,您可以将其读入一个字符串数据帧(每行是一个字符串)。然后,您可以从每个 json 中提取类型(通过使用 UDF 或使用诸如 get_json_object 或 json_tuple 之类的函数)。

    现在您有两列:类型和原始 json。您现在可以在写入数据帧时使用 partitionBy 数据帧选项。这将为每种类型生成一个目录,该目录的内容将包括原始 jsons。

    现在,您可以使用自己的架构读取每种类型。

    您还可以使用映射对 RDD 执行类似的操作,该映射将输入 rdd 转换为一对 rdd,键是类型,值是转换为目标模式的 json。然后您可以使用 partitionBy 和 map partition 将每个分区保存到一个文件中,或者您可以使用 reduce by key 写入不同的文件(例如,通过使用 key 设置文件名)。

    你也可以看看Write to multiple outputs by key Spark - one Spark job

    请注意,我在这里假设目标是拆分为文件。根据您的特定用例,其他选项可能是可行的。例如,如果您的不同模式足够接近,您可以创建一个包含所有模式的 super 模式,并直接从中创建数据帧。然后,您可以直接处理数据帧,也可以使用数据帧 partitionBy 将不同的子类型写入不同的目录(但这次已保存到 Parquet )。

    关于apache-spark - 将 RDD 解复用到多个 ORC 表上,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41148981/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com