- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Spark 和 partitionBy
将一个大的分区数据集写入磁盘。算法正在与我尝试过的两种方法作斗争。
分区严重倾斜 - 一些分区很大,而其他分区很小。
问题 #1 :
当我之前使用重新分区时 repartitionBy
, Spark 将所有分区写成一个文件,即使是大分区
val df = spark.read.parquet("some_data_lake")
df
.repartition('some_col).write.partitionBy("some_col")
.parquet("partitioned_lake")
repartition
, Spark 写出太多文件。
df.write.partitionBy("some_col").parquet("partitioned_lake")
最佳答案
最简单的解决方案是在 repartition
中添加一列或多列并明确设置分区数。
val numPartitions = ???
df.repartition(numPartitions, $"some_col", $"some_other_col")
.write.partitionBy("some_col")
.parquet("partitioned_lake")
numPartitions
- 应该是写入分区目录的所需文件数量的上限(实际数量可以更低)。 $"some_other_col"
(和可选的附加列)应该具有高基数并且独立于 $"some_column
(这两者之间应该存在函数依赖关系,并且不应该高度相关)。o.a.s.sql.functions.rand
.import org.apache.spark.sql.functions.rand
df.repartition(numPartitions, $"some_col", rand)
.write.partitionBy("some_col")
.parquet("partitioned_lake")
关于apache-spark - 使用 Spark 的 partitionBy 方法对 S3 中的大型倾斜数据集进行分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53037124/
使用哈希函数: balanceLoad = lambda x: bisect.bisect_left(boundary_array, -keyfunc(x)) 其中boundary_array为[-6
我正在尝试在如下嵌套字段上调用 partitionBy: val rawJson = sqlContext.read.json(filename) rawJson.write.partitionB
目前,当我使用 paritionBy()写入 HDFS: DF.write.partitionBy("id") 我会得到看起来像的输出结构(这是默认行为): ../id=1/ ../id=2/ ../
我们常见的 Spark 处理流程是这样的: 正在加载: rdd = sqlContext.parquetFile("mydata/") rdd = rdd.map(lambda row: (row.i
我按如下方式对DataFrame进行分区: df.write.partitionBy("type", "category").parquet(config.outpath) 该代码给出了预期的结果(即
输入: 输入数据集包含以 Parquet 形式存储的多个文件中的 1000 万笔交易。包括所有文件在内的整个数据集的大小范围为 6 到 8GB。 问题说明: 根据客户 ID 对交易进行分区,这将为每个
创建一个接受TableName和Partition列作为输入的通用代码。但是在尝试将数据帧写入分区表时面临问题。 partAttr='product_category_id,product_id' p
所以我做的是 rdd.flatMap(lambda x: enumerate(x)) 为我的数据制作键 0-49。然后我决定这样做: rdd.flatMap(lambda x: enumerate(x
我以一种方式对数据进行了分区,我只想以另一种方式对其进行分区。所以它基本上是这样的: sqlContext.read().parquet("...").write().partitionBy("...
我了解 partitionBy 函数对我的数据进行分区。如果我使用 rdd.partitionBy(100),它会将我的数据按键分成 100 个部分。即与相似键关联的数据将被分组在一起 我的理解正确吗
我有一个 csv 记录,并作为数据框导入: --------------------------- name | age | entranceDate | ----------------------
我一直在使用 partitionBy 但我不太明白为什么我们应该使用它。 我有一个像这样的 csv 记录: --------------------------- --------- name | a
我有一个数据框,保存为 Parquet 格式时大约需要 11GB。读取 dataframe 并写入 json 时,需要 5 分钟。当我添加partitionBy(“day”)时,需要几个小时才能完成。
假设我有一个包含大约 21 亿条记录的数据集。 这是一个包含客户信息的数据集,我想知道他们做了多少次。所以我应该对 ID 进行分组并对一列求和(它有 0 和 1 值,其中 1 表示一个 Action
我有一个类似于下面的类 MyObject . public class MyObject { private String key; // not unique. multiple objec
我有一个 DataFrame我需要根据特定的分区写入 S3。代码如下所示: dataframe .write .mode(SaveMode.Append) .partitionBy("ye
我需要根据特定的 Partition 键将数据写入 s3,这可以使用 write.partitionBy 轻松完成。但是,在这种情况下,我只需要在每个路径中写入一个文件。我正在使用以下代码来执行此操作
我需要根据特定的 Partition 键将数据写入 s3,这可以使用 write.partitionBy 轻松完成。但是,在这种情况下,我只需要在每个路径中写入一个文件。我正在使用以下代码来执行此操作
我已经在 Spark 中使用 Window 成功创建了一个 row_number() partitionBy,但我想按降序而不是默认的升序对其进行排序。这是我的工作代码: from pyspark i
我有一个名为 target_col_a 的专栏在我的数据框中,时间戳值已转换为字符串,例如2020-05-27 08:00:00 . 然后我partitionBy此列如下。 target_datase
我是一名优秀的程序员,十分优秀!