- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有两个数据框 df1
和 df2
我想在名为 visitor_id
的高基数字段上多次加入这些表.我只想执行一次初始洗牌,并在不洗牌/交换 Spark 执行程序之间的数据的情况下进行所有连接。
为此,我创建了另一个名为 visitor_partition
的专栏。一致地为每个visitor_id 分配[0, 1000)
之间的随机值.我使用了自定义分区器来确保 df1
和 df2
被精确分区,使得每个分区只包含来自 visitor_partition
的一个值的行.这个初始重新分区是我唯一一次想要对数据进行洗牌。
我已将每个数据帧保存到 s3 中的 Parquet ,按访问者分区进行分区——对于每个数据帧,这将创建 1000 个组织在 df1/visitor_partition=0
中的文件, df1/visitor_partition=1
... df1/visitor_partition=999
.
现在我从 Parquet 加载每个数据帧并通过 df1.createOrReplaceTempView('df1')
将它们注册为临时 View (与 df2 相同),然后运行以下查询
SELECT
...
FROM
df1 FULL JOIN df1 ON
df1.visitor_partition = df2.visitor_partition AND
df1.visitor_id = df2.visitor_id
df1/visitor_partition=1
加载数据和
df2/visitor_partition=2
并加入那里的行。然而,在实践中,spark 2.4.4 的查询规划器在这里执行完整的数据洗牌。
最佳答案
您可以使用 bucketBy DataFrameWriter ( other documentation ) 的方法。
在以下示例中,VisitorID 列的值将被散列到 500 个桶中。通常,对于 join,Spark 会根据 VisitorID 上的哈希值执行交换阶段。但是,在这种情况下,您已经使用哈希对数据进行了预分区。
inputRdd = sc.parallelize(list((i, i%200) for i in range(0,1000000)))
schema = StructType([StructField("VisitorID", IntegerType(), True),
StructField("visitor_partition", IntegerType(), True)])
inputdf = inputRdd.toDF(schema)
inputdf.write.bucketBy(500, "VisitorID").saveAsTable("bucketed_table")
inputDf1 = spark.sql("select * from bucketed_table")
inputDf2 = spark.sql("select * from bucketed_table")
inputDf3 = inputDf1.alias("df1").join(inputDf2.alias("df2"), col("df1.VisitorID") == col("df2.VisitorID"))
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
== Physical Plan ==
*(3) SortMergeJoin [VisitorID#351], [VisitorID#357], Inner
:- *(1) Sort [VisitorID#351 ASC NULLS FIRST], false, 0
: +- *(1) Project [VisitorID#351, visitor_partition#352]
: +- *(1) Filter isnotnull(VisitorID#351)
: +- *(1) FileScan parquet default.bucketed_6[VisitorID#351,visitor_partition#352] Batched: true, DataFilters: [isnotnull(VisitorID#351)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
+- *(2) Sort [VisitorID#357 ASC NULLS FIRST], false, 0
+- *(2) Project [VisitorID#357, visitor_partition#358]
+- *(2) Filter isnotnull(VisitorID#357)
+- *(2) FileScan parquet default.bucketed_6[VisitorID#357,visitor_partition#358] Batched: true, DataFilters: [isnotnull(VisitorID#357)], Format: Parquet, Location: InMemoryFileIndex[dbfs:/user/hive/warehouse/bucketed_6], PartitionFilters: [], PushedFilters: [IsNotNull(VisitorID)], ReadSchema: struct<VisitorID:int,visitor_partition:int>, SelectedBucketsCount: 500 out of 500
inputdf.write.partitionBy("visitor_partition").saveAsTable("partitionBy_2")
关于apache-spark - Spark : Prevent shuffle/exchange when joining two identically partitioned dataframes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59034719/
我有什么: 简单的服务器,配备一个具有 8 个逻辑内核的至强处理器、16 GB 内存、2 个 7200rpm 驱动器的 mdadm raid1。 PostgreSQL 需要处理大量数据。每天导入多达
当消息排入分区队列时,服务总线会检查分区键是否存在。如果找到,它将根据分区键选择片段。 但是当该片段已满时会发生什么,该片段中没有剩余空间。服务总线是否给出错误/消息被丢弃或任何其他片段将用于存储该消
我想知道将集合拆分为子集的有效方法是什么? Iterable> partitions = Iterables.partition(numbers, 10); 或 List> partitions =
有人可以告诉我 DATETIME 列上 HASH PARITION 与 RANGE PARTITION 的优缺点吗?假设我们有一个包含 2000 万条记录的 POS 表,并且想要根据交易日期的年份创建
我们有一个 cosmos-db 容器,其中包含大约 1M 条记录,其中包含有关客户的信息。 documentDb 的分区键是 customerId,它保存客户的唯一 GUID 引用。我已阅读parti
因此,我在写入数据库的步骤中有 2 个分区。我想记录每个分区写入的行数,得到总和,打印到日志中; 我正在考虑在编写器中使用static变量,并使用Step Context/Job Context在St
Bigquery 文档说可以更新分区表的分区时间到期。而我只能为摄取时间分区表执行此操作。我尝试了以下方法: bq query --use_legacy_sql=false ' CREATE
这是一个两部分的问题: 1) 是否可以根据数据的 ROWID 或其他标识符使用 select 语句检索数据所在分区的名称? 例如。 SELECT DATA_ID, CATEGORY, VALUE, *
注意:这几乎是 this question 的副本区别在于,在这种情况下,源表是日期分区的,而目标表尚不存在。此外,该问题的公认解决方案在这种情况下不起作用。 我试图将一天的数据从一个日期分区表复制到
我已经搜索了很多,但找不到有关以下场景的任何信息。 考虑一个包含超过 500,000 行、约 20 列和约 5 列上的 INDEX 的 InnoDB 表。 当该表处于以下情况时,执行“ALTER TA
如何将分区表(在 Oracle 10g 数据库中)更改为不仅用于分区而且还用于表本身的新表空间?我的意思是,我可以毫无问题地进行以下操作, --sql 改变表 abc 移动分区 abc01 表空间 n
我们正在尝试基于 BigQuery 在云中构建(或者更好地说重建)我们的 DWH。我们决定对原始数据使用“按日期字段分区”表(如“created_date”字段),而不是摄取时间分区,因为通过此功能,
给定一个使用分区的 Spring Batch 作业:
“每个分区中可以有许多键(及其相关值),但任何给定键的记录都在一个分区中。”这是一本著名的hadoop教科书的一行。我没有理解它的第二部分的全部含义,即“但是任何给定键的记录都在一个分区中。”这是否意
Let's say I have an Athena table mytable partitioned by columns A, B, and C.假设我有一个由列A、B和C分区的Athen
我正在寻找一些文档来了解 hive.exec.max.dynamic.partitions 和 hive.exec.max.dynamic.partitions.pernode 之间的区别。 我们什么
我看过一些关于创建分区的表的很好的解释,这些分区是 CLUSTERED BY 和 SORTED BY。这与创建带分区的表,然后使用 CLUSTER BY 填充表(例如使用 INSERT OVERWRI
使用摄取时间分区表,可以免费查询每个分区的行数。字节计费为 0。 SELECT DATE(_PARTITIONTME) AS dd, COUNT(*) FROM ds.ingestion_time_p
此处提供示例项目: https://github.com/codependent/micronaut-aws-lambda-proxy-graal 我在 Amazon AWS 上部署了一个 Micro
是否可以在不指定分区键的情况下通过其 ID 检索文档? 我的理解来自阅读 documentation是当未指定分区键时查询将在所有分区中扇出: The following query does not
我是一名优秀的程序员,十分优秀!