gpt4 book ai didi

python - Apache Spark按用户ID排序分区,并将每个分区写入CSV

转载 作者:行者123 更新时间:2023-12-04 15:53:23 25 4
gpt4 key购买 nike

我有一个用例,使用Spark解决起来似乎比较简单,但似乎找不到一种确定的方法。

我有一个数据集,其中包含各种用户的时间序列数据。我要做的就是:

  • 通过用户ID
  • 对该数据集进行分区
  • 对每个用户的时间序列数据进行排序,然后应该将其包含在各个分区
  • 将每个分区写入单个CSV文件。最后,我希望每个用户ID包含1个CSV文件。

  • 我尝试使用以下代码段,但最终得到了令人惊讶的结果。我的确得到了每个用户ID 1个csv文件,并且某些用户的时间序列数据最终得到了排序,但是许多其他用户的数据却未排序。
    # repr(ds) = DataFrame[userId: string, timestamp: string, c1: float, c2: float, c3: float, ...]
    ds = load_dataset(user_dataset_path)
    ds.repartition("userId")
    .sortWithinPartitions("timestamp")
    .write
    .partitionBy("userId")
    .option("header", "true")
    .csv(output_path)

    我不清楚为什么会发生这种情况,我也不完全知道该怎么做。我也不确定这是否可能是Spark中的错误。

    我正在将Spark 2.0.2与Python 2.7.12一起使用。任何建议将不胜感激!

    最佳答案

    以下代码对我有用(在Scala中显示,但在Python中类似)。

    我为每个用户名得到一个文件,输出文件中的行按时间戳值排序。

    testDF
    .select( $"username", $"timestamp", $"activity" )
    .repartition(col("username"))
    .sortWithinPartitions(col("username"),col("timestamp")) // <-- both here
    .write
    .partitionBy("username")
    .mode(SaveMode.Overwrite)
    .option("header", "true")
    .option("delimiter", ",")
    .csv(folder + "/useractivity")

    导入的事情是将username和timestamp列都作为 sortWithinPartitions 的参数。

    这是其中一个输出文件的外观(我使用一个简单的整数作为时间戳):
    timestamp,activity
    345,login
    402,upload
    515,download
    600,logout

    关于python - Apache Spark按用户ID排序分区,并将每个分区写入CSV,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41813787/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com