- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我已经开始在 Spark 1.4.0 中使用 Spark SQL 和 DataFrames。我想在 Scala 中的 DataFrames 上定义自定义分区器,但不知道如何执行此操作。
我正在使用的数据表之一包含一个交易列表,按帐户,类似于以下示例。
Account Date Type Amount
1001 2014-04-01 Purchase 100.00
1001 2014-04-01 Purchase 50.00
1001 2014-04-05 Purchase 70.00
1001 2014-04-01 Payment -150.00
1002 2014-04-01 Purchase 80.00
1002 2014-04-02 Purchase 22.00
1002 2014-04-04 Payment -120.00
1002 2014-04-04 Purchase 60.00
1003 2014-04-02 Purchase 210.00
1003 2014-04-03 Purchase 15.00
最佳答案
Spark >= 2.3.0
SPARK-22614公开范围分区。
val partitionedByRange = df.repartitionByRange(42, $"k")
partitionedByRange.explain
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k ASC NULLS FIRST], 42
// +- AnalysisBarrier Project [_1#2 AS k#5, _2#3 AS v#6]
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- Project [_1#2 AS k#5, _2#3 AS v#6]
// +- LocalRelation [_1#2, _2#3]
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#5 ASC NULLS FIRST], 42
// +- LocalRelation [k#5, v#6]
//
// == Physical Plan ==
// Exchange rangepartitioning(k#5 ASC NULLS FIRST, 42)
// +- LocalTableScan [k#5, v#6]
repartition
方法:
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
val partitioned = df.repartition($"k")
partitioned.explain
// scala> df.repartition($"k").explain(true)
// == Parsed Logical Plan ==
// 'RepartitionByExpression ['k], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Analyzed Logical Plan ==
// k: string, v: int
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Optimized Logical Plan ==
// RepartitionByExpression [k#7], None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- LogicalRDD [_1#5,_2#6], MapPartitionsRDD[3] at rddToDataFrameHolder at <console>:27
//
// == Physical Plan ==
// TungstenExchange hashpartitioning(k#7,200), None
// +- Project [_1#5 AS k#7,_2#6 AS v#8]
// +- Scan PhysicalRDD[_1#5,_2#6]
RDDs
Spark
Dataset
(包括
Dataset[Row]
又名
DataFrame
)目前无法使用自定义分区器。您通常可以通过创建人工分区列来解决这个问题,但它不会为您提供相同的灵活性。
DataFrame
之前对输入数据进行预分区。
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.HashPartitioner
val schema = StructType(Seq(
StructField("x", StringType, false),
StructField("y", LongType, false),
StructField("z", DoubleType, false)
))
val rdd = sc.parallelize(Seq(
Row("foo", 1L, 0.5), Row("bar", 0L, 0.0), Row("??", -1L, 2.0),
Row("foo", -1L, 0.0), Row("??", 3L, 0.6), Row("bar", -3L, 0.99)
))
val partitioner = new HashPartitioner(5)
val partitioned = rdd.map(r => (r.getString(0), r))
.partitionBy(partitioner)
.values
val df = sqlContext.createDataFrame(partitioned, schema)
DataFrame
来自
RDD
的创作只需要一个简单的 map 阶段就应该保留现有的分区布局*:
assert(df.rdd.partitions == partitioned.partitions)
DataFrame
:
sqlContext.createDataFrame(
df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
df.schema
)
GROUP BY
的简单聚合- 可以减少临时缓冲区**的内存占用,但总体成本要高得多。或多或少相当于 groupByKey.mapValues(_.reduce)
(当前行为)vs reduceByKey
(预分区)。在实践中不太可能有用。 SqlContext.cacheTable
进行数据压缩.由于看起来它正在使用运行长度编码,因此应用 OrderedRDDFunctions.repartitionAndSortWithinPartitions
可以提高压缩比。 predicates
argument .它可以按如下方式使用:
sqlContext.read.jdbc(url, table, Array("foo = 1", "foo = 3"), props)
partitionBy
DataFrameWriter
中的方法 :
DataFrameWriter
提供
partitionBy
可用于在写入时“分区”数据的方法。它使用提供的一组列在写入时分离数据
val df = Seq(
("foo", 1.0), ("bar", 2.0), ("foo", 1.5), ("bar", 2.6)
).toDF("k", "v")
df.write.partitionBy("k").json("/tmp/foo.json")
val df1 = sqlContext.read.schema(df.schema).json("/tmp/foo.json")
df1.where($"k" === "bar")
DataFrame.repartition
.特别是像这样的聚合:
val cnts = df1.groupBy($"k").sum()
TungstenExchange
:
cnts.explain
// == Physical Plan ==
// TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Final,isDistinct=false)], output=[k#90,sum(v)#93])
// +- TungstenExchange hashpartitioning(k#90,200), None
// +- TungstenAggregate(key=[k#90], functions=[(sum(v#91),mode=Partial,isDistinct=false)], output=[k#90,sum#99])
// +- Scan JSONRelation[k#90,v#91] InputPaths: file:/tmp/foo.json
bucketBy
DataFrameWriter
中的方法 ( Spark >= 2.0):
bucketBy
与
partitionBy
有类似的应用但它仅适用于表 (
saveAsTable
)。分桶信息可用于优化连接:
// Temporarily disable broadcast joins
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df.write.bucketBy(42, "k").saveAsTable("df1")
val df2 = Seq(("A", -1.0), ("B", 2.0)).toDF("k", "v2")
df2.write.bucketBy(42, "k").saveAsTable("df2")
// == Physical Plan ==
// *Project [k#41, v#42, v2#47]
// +- *SortMergeJoin [k#41], [k#46], Inner
// :- *Sort [k#41 ASC NULLS FIRST], false, 0
// : +- *Project [k#41, v#42]
// : +- *Filter isnotnull(k#41)
// : +- *FileScan parquet default.df1[k#41,v#42] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df1], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v:int>
// +- *Sort [k#46 ASC NULLS FIRST], false, 0
// +- *Project [k#46, v2#47]
// +- *Filter isnotnull(k#46)
// +- *FileScan parquet default.df2[k#46,v2#47] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/spark-warehouse/df2], PartitionFilters: [], PushedFilters: [IsNotNull(k)], ReadSchema: struct<k:string,v2:double>
partitioned
RDD 不再有分区器。
关于scala - 如何定义DataFrame的分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30995699/
是一种在 Neo4j 分区之间进行物理分离的方法吗? 这意味着以下查询将转到 node1: Match (a:User:Facebook) 虽然此查询将转到另一个节点(可能托管在 docker 上)
我尝试在我的 SQL 服务器上使用分区函数对我的一个大表进行分区,但我收到一条错误消息 “只能在SQL Server企业版中创建分区功能。只有SQL Server企业版支持分区。” 所以我想知道没有企
在hadoop文件系统中,我有两个文件,分别是X和Y。通常,hadoop制作的文件X和Y的大小为64 MB。是否可以强制hadoop划分两个文件,以便从X的32 MB和Y的32 MB中创建一个64 M
据我了解,如果我们有一个主键,则使用该键对数据进行分区并将其存储在节点中(例如使用随机分区器)。 现在我不确定的是,如果我有多个键(又名复合键),是用于分区数据的键的组合还是它将是第一个主键? 例如,
我正在向我的 SSAS 多维数据集添加分区,我想知道是否有多个分区可以保留在下面?多少太多了,最佳实践限制是 20 还是 200?有没有人可以分享任何真实世界的知识? 最佳答案 这是 another
我有一个包含大约 200 万条记录的大表,我想对其进行分区。 我将 id 列设置为 PRIMARY AUTO_INCRMENT int (并且它必须始终是唯一的)。我有一列“theyear”int(4
我正在做 mysql 列表分区。我的表数据如下 ---------------------------------------- id | unique_token | city | student_
我有一个表,我们每天在其中插入大约 2000 万个条目(没有任何限制的盲插入)。我们有两个外键,其中一个是对包含大约 1000 万个条目的表的引用 ID。 我打算删除此表中超过一个月的所有数据,因为不
我想在一款足球奇幻游戏中尝试使用 MySQL Partitioning,该游戏的用户分布在联赛中,每个联赛都有一个用户可以买卖球员的市场。当很多用户同时玩时,我在这张表中遇到了一些僵局(在撰写本文时大
我是 jQuery 的新手,想知道是否可以获取一些变量并将它们的除法作为 CSS 宽度。到目前为止我在这里: var x = $(".some-container").length; var y =
所以我正在做家庭作业,我需要为分区、斯特林数(第一类和第二类)和第一类的切比雪夫多项式创建递归函数。我的程序应该能够让用户输入一个正整数 n,然后创建名为 Partitions.txt、Stirlin
我在数据框中有一列,其中包含大约 1,4M 行聊天对话,其中每个单元格中的一般格式为 (1): “名称代理 : 对话” 但是,并非列中的所有单元格都采用这种格式。有些单元格只是 (2): “对话” 我
我在尝试隐藏 a 时遇到了一些问题,直到用户单击某个元素为止。 HTML 看起来像: BRAND item 1 item 2 item 3
一.为什么kafka要做分区? 因为当一台机器有可能扛不住(类比:就像redis集群中的redis-cluster一样,一个master抗不住写,那么就多个master去抗写)
我有一些销售数据,我需要发送存储在单独表中的可用槽中的数量。 销售数据示例: id数量112131415369 create table sales (id serial primary key, q
我计划设置多个节点以使用 glusterfs 创建分布式复制卷 我使用主(也是唯一)分区上的目录在两个节点上创建了一个 gluster 复制卷。 gluster volume create vol_d
我正在尝试使用 sum() over (partition by) 但在总和中过滤。我的用例是将每个产品的 12 个月累计到一个月的条目,因此: ITEM MONTH SALES Item
是否可以创建多个 Enumerators出单Enumerator ? 我正在寻找的相当于 List.partition返回 (List[A], List[A]) ,比如 List().partitio
我正在创建一个基于 x86 的非常简单的 Yocto 图像。 我希望/文件系统是只读的,所以我设置了 IMAGE_FEATURES_append = " read-only-rootfs " 在原件的
是否可以使用一次 collect 调用来创建 2 个新列表?如果没有,我该如何使用分区来做到这一点? 最佳答案 collect(在TraversableLike上定义并在所有子类中可用)与集合和Par
我是一名优秀的程序员,十分优秀!