- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
当我最初编写 Delta Lake 时,使用分区(使用partitionBy)与否没有任何区别。
在写入之前对同一列使用重新分区,只会更改 parquet 文件的数量。使要分区的列显式“不可为空”不会改变效果。
版本:
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val tmp = spark.createDataFrame(
spark.sparkContext.parallelize((1 to 10).map(n => Row(n, n % 3))),
StructType(Seq(StructField("CONTENT", IntegerType), StructField("PARTITION", IntegerType))))
/*
tmp.show
+-------+---------+
|CONTENT|PARTITION|
+-------+---------+
| 1| 1|
| 2| 2|
| 3| 0|
| 4| 1|
| 5| 2|
| 6| 0|
| 7| 1|
| 8| 2|
| 9| 0|
| 10| 1|
+-------+---------+
tmp.printSchema
root
|-- CONTENT: integer (nullable = true)
|-- PARTITION: integer (nullable = true)
*/
tmp.write.format("delta").partitionBy("PARTITION").save("PARTITIONED_DELTA_LAKE")
生成的 delta-lake 目录如下:
ls -1 PARTITIONED_DELTA_LAKE
_delta_log
00000000000000000000.json
part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet
part-00007-3155dde1-9f41-49b5-908e-08ce6fc077af-c000.snappy.parquet
part-00014-047f6a28-3001-4686-9742-4e4dbac05c53-c000.snappy.parquet
part-00021-e0d7f861-79e9-41c9-afcd-dbe688720492-c000.snappy.parquet
part-00028-fe3da69d-660a-445b-a99c-0e7ad2f92bf0-c000.snappy.parquet
part-00035-d69cfb9d-d320-4d9f-9b92-5d80c88d1a77-c000.snappy.parquet
part-00043-edd049a2-c952-4f7b-8ca7-8c0319932e2d-c000.snappy.parquet
part-00050-38eb3348-9e0d-49af-9ca8-a323e58b3712-c000.snappy.parquet
part-00057-906312ad-8556-4696-84ba-248b01664688-c000.snappy.parquet
part-00064-31f5d03d-2c63-40e7-8fe5-a8374eff9894-c000.snappy.parquet
part-00071-e1afc2b9-aa5b-4e7c-b94a-0c176523e9f1-c000.snappy.parquet
cat PARTITIONED_DELTA_LAKE/_delta_log/00000000000000000000.json
{"commitInfo":{"timestamp":1579073383370,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isBlindAppend":true}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"2cdd6fbd-bffa-415e-9c06-94ffc2048cbe","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"CONTENT\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"PARTITION\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1579073381183}}
{"add":{"path":"part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet","partitionValues":{},"size":363,"modificationTime":1579073382329,"dataChange":true}}
{"add":{"path":"part-00007-3155dde1-9f41-49b5-908e-08ce6fc077af-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382545,"dataChange":true}}
{"add":{"path":"part-00014-047f6a28-3001-4686-9742-4e4dbac05c53-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382237,"dataChange":true}}
{"add":{"path":"part-00021-e0d7f861-79e9-41c9-afcd-dbe688720492-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382583,"dataChange":true}}
{"add":{"path":"part-00028-fe3da69d-660a-445b-a99c-0e7ad2f92bf0-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382893,"dataChange":true}}
{"add":{"path":"part-00035-d69cfb9d-d320-4d9f-9b92-5d80c88d1a77-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382488,"dataChange":true}}
{"add":{"path":"part-00043-edd049a2-c952-4f7b-8ca7-8c0319932e2d-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073383262,"dataChange":true}}
{"add":{"path":"part-00050-38eb3348-9e0d-49af-9ca8-a323e58b3712-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382683,"dataChange":true}}
{"add":{"path":"part-00057-906312ad-8556-4696-84ba-248b01664688-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382416,"dataChange":true}}
{"add":{"path":"part-00064-31f5d03d-2c63-40e7-8fe5-a8374eff9894-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382549,"dataChange":true}}
{"add":{"path":"part-00071-e1afc2b9-aa5b-4e7c-b94a-0c176523e9f1-c000.snappy.parquet","partitionValues":{},"size":625,"modificationTime":1579073382511,"dataChange":true}}
我期望类似的东西
ls -1 PARTITIONED_DELTA_LAKE
_delta_log
00000000000000000000.json
PARTITION=0
part-00000-a3015965-b101-4f63-87de-1d06a7662312-c000.snappy.parquet
...
cat PARTITIONED_DELTA_LAKE/_delta_log/00000000000000000000.json
..."partitionBy":"[PARTITION]"...
..."partitionColumns":[PARTITION]...
..."partitionValues":{0}...
最佳答案
如Jacek评论说,使用的Spark版本太旧。我已经尝试过 Spark 版本的上述代码:
只有 2.4.2 分区才能按预期工作。在此版本中this bugfix可能是问题得到解决的原因:
.. Users can specify columns in partitionBy and our internal data sources will use this information. Unfortunately, for external systems, this data is silently dropped with no feedback given to the user ..
关于apache-spark - 写Delta Lake时使用分区(配合partitionBy)没有效果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59747322/
使用哈希函数: balanceLoad = lambda x: bisect.bisect_left(boundary_array, -keyfunc(x)) 其中boundary_array为[-6
我正在尝试在如下嵌套字段上调用 partitionBy: val rawJson = sqlContext.read.json(filename) rawJson.write.partitionB
目前,当我使用 paritionBy()写入 HDFS: DF.write.partitionBy("id") 我会得到看起来像的输出结构(这是默认行为): ../id=1/ ../id=2/ ../
我们常见的 Spark 处理流程是这样的: 正在加载: rdd = sqlContext.parquetFile("mydata/") rdd = rdd.map(lambda row: (row.i
我按如下方式对DataFrame进行分区: df.write.partitionBy("type", "category").parquet(config.outpath) 该代码给出了预期的结果(即
输入: 输入数据集包含以 Parquet 形式存储的多个文件中的 1000 万笔交易。包括所有文件在内的整个数据集的大小范围为 6 到 8GB。 问题说明: 根据客户 ID 对交易进行分区,这将为每个
创建一个接受TableName和Partition列作为输入的通用代码。但是在尝试将数据帧写入分区表时面临问题。 partAttr='product_category_id,product_id' p
所以我做的是 rdd.flatMap(lambda x: enumerate(x)) 为我的数据制作键 0-49。然后我决定这样做: rdd.flatMap(lambda x: enumerate(x
我以一种方式对数据进行了分区,我只想以另一种方式对其进行分区。所以它基本上是这样的: sqlContext.read().parquet("...").write().partitionBy("...
我了解 partitionBy 函数对我的数据进行分区。如果我使用 rdd.partitionBy(100),它会将我的数据按键分成 100 个部分。即与相似键关联的数据将被分组在一起 我的理解正确吗
我有一个 csv 记录,并作为数据框导入: --------------------------- name | age | entranceDate | ----------------------
我一直在使用 partitionBy 但我不太明白为什么我们应该使用它。 我有一个像这样的 csv 记录: --------------------------- --------- name | a
我有一个数据框,保存为 Parquet 格式时大约需要 11GB。读取 dataframe 并写入 json 时,需要 5 分钟。当我添加partitionBy(“day”)时,需要几个小时才能完成。
假设我有一个包含大约 21 亿条记录的数据集。 这是一个包含客户信息的数据集,我想知道他们做了多少次。所以我应该对 ID 进行分组并对一列求和(它有 0 和 1 值,其中 1 表示一个 Action
我有一个类似于下面的类 MyObject . public class MyObject { private String key; // not unique. multiple objec
我有一个 DataFrame我需要根据特定的分区写入 S3。代码如下所示: dataframe .write .mode(SaveMode.Append) .partitionBy("ye
我需要根据特定的 Partition 键将数据写入 s3,这可以使用 write.partitionBy 轻松完成。但是,在这种情况下,我只需要在每个路径中写入一个文件。我正在使用以下代码来执行此操作
我需要根据特定的 Partition 键将数据写入 s3,这可以使用 write.partitionBy 轻松完成。但是,在这种情况下,我只需要在每个路径中写入一个文件。我正在使用以下代码来执行此操作
我已经在 Spark 中使用 Window 成功创建了一个 row_number() partitionBy,但我想按降序而不是默认的升序对其进行排序。这是我的工作代码: from pyspark i
我有一个名为 target_col_a 的专栏在我的数据框中,时间戳值已转换为字符串,例如2020-05-27 08:00:00 . 然后我partitionBy此列如下。 target_datase
我是一名优秀的程序员,十分优秀!