gpt4 book ai didi

apache-spark - 如何使用 SPARK 将多个 parquet 文件转换为 TFrecord 文件?

转载 作者:行者123 更新时间:2023-12-04 03:57:42 33 4
gpt4 key购买 nike

我想根据特定条件从大型 DataFrame 生成分层的 TFrecord 文件,为此我使用 write.partitionBy() .我也在 SPARK 中使用 tensorflow-connector,但这显然不能与 write.partitionBy() 一起使用手术。因此,除了尝试分两步工作之外,我还没有找到其他方法:

  • 根据我的情况重新分配数据框,使用 partitionBy()并将生成的分区写入 Parquet 文件。
  • 阅读这些 Parquet 文件,使用 tensorflow-connector 插件将它们转换为 TFrecord 文件。

  • 这是我无法有效完成的第二步。我的想法是读入执行器上的单个 Parquet 文件并立即将它们写入 TFrecord 文件。但这需要访问只能在驱动程序( discussed here )中完成的 SQLContext,所以不能并行。我想做这样的事情:
    # List all parquet files to be converted
    import glob, os
    files = glob.glob('/path/*.parquet'))

    sc = SparkSession.builder.getOrCreate()
    sc.parallelize(files, 2).foreach(lambda parquetFile: convert_parquet_to_tfrecord(parquetFile))

    我可以构造函数 convert_parquet_to_tfrecord那将能够在执行者身上做到这一点?

    我还尝试在读取所有 Parquet 文件时仅使用通配符:
    SQLContext(sc).read.parquet('/path/*.parquet')

    这确实会读取所有 Parquet 文件,但不幸的是不会读取到单个分区。看起来原始结构丢失了,所以如果我想要将单个 Parquet 文件的确切内容转换为 TFrecord 文件,它对我没有帮助。

    还有其他建议吗?

    最佳答案

    试试 spark-tfrecord。

    Spark-TFRecord 是一个类似于 spark-tensorflow-connector 的工具,但它确实是 partitionBy。以下示例显示了如何对数据集进行分区。

    import org.apache.spark.sql.SaveMode

    // create a dataframe

    val df = Seq((8, "bat"),(8, "abc"), (1, "xyz"), (2, "aaa")).toDF("number", "word")
    val tf_output_dir = "/tmp/tfrecord-test"

    // dump the tfrecords to files.
    df.repartition(3, col("number")).write.mode(SaveMode.Overwrite).partitionBy("number").format("tfrecord").option("recordType", "Example").save(tf_output_dir)

    更多信息可以在
    Github 仓库:
    https://github.com/linkedin/spark-tfrecord

    关于apache-spark - 如何使用 SPARK 将多个 parquet 文件转换为 TFrecord 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54312284/

    33 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com