gpt4 book ai didi

database - Apache Spark 的主键

转载 作者:可可西里 更新时间:2023-11-01 14:07:42 26 4
gpt4 key购买 nike

我正在与 Apache Spark 和 PostgreSQL 建立 JDBC 连接,我想将一些数据插入到我的数据库中。当我使用 append 模式时,我需要为每个 DataFrame.Row 指定 id。 Spark 有什么方法可以创建主键吗?

最佳答案

斯卡拉:

如果您只需要唯一编号,您可以使用 zipWithUniqueId 并重新创建 DataFrame。首先是一些导入和虚拟数据:

import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

提取架构以供进一步使用:

val schema = df.schema

添加id字段:

val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

创建数据框:

val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))

Python 中也是一样:

from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

如果您更喜欢连续编号,您可以将 zipWithUniqueId 替换为 zipWithIndex,但它会稍微贵一些。

直接使用 DataFrame API:

(语法几乎相同的通用 Scala、Python、Java、R)

以前我错过了 monotonicallyIncreasingId 函数,只要您不需要连续的数字,它应该可以正常工作:

import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+

虽然有用的 monotonicallyIncreasingId 是不确定的。不仅 id 可能因执行而异,而且在后续操作包含过滤器时,如果没有额外的技巧,则不能用于识别行。

注意:

也可以使用rowNumber 窗口函数:

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

不幸的是:

WARN Window: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

因此,除非您有一种自然的方式来划分数据并确保唯一性,否则目前并不是特别有用。

关于database - Apache Spark 的主键,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33102727/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com