gpt4 book ai didi

azure - 在Azure databricks中,将pyspark数据帧写入eventhub花费的时间太长,因为数据帧中有300万条记录

转载 作者:行者123 更新时间:2023-12-03 03:39:41 29 4
gpt4 key购买 nike

Oracle数据库表有300万条记录。我需要将其读入 dataframe,然后将其转换为 json 格式并将其发送到下游系统的 eventhub。

下面是我的 pyspark 代码,用于连接和读取 Oracle 数据库表作为数据帧

df = spark.read \
.format("jdbc") \
.option("url", databaseurl) \
.option("query","select * from tablename") \
.option("user", loginusername) \
.option("password", password) \
.option("driver", "oracle.jdbc.driver.OracleDriver") \
.option("oracle.jdbc.timezoneAsRegion", "false") \
.load()

然后我将每行的列名称和值转换为 json(放置在名为 body 的新列下),然后将其发送到 Eventhub。

我已经定义了 ehconf 和 eventhub 连接字符串。以下是我编写的 eventhub 代码

df.select("body") \
.write\
.format("eventhubs") \
.options(**ehconf) \
.save()

我的 pyspark 代码需要 8 小时才能将 300 万条记录发送到 eventhub。

您能否建议如何更快地将 pyspark 数据帧写入 eventhub ?

我的Eventhub是在eventhub集群下创建的,容量为1 CU

Databricks 集群配置:模式: 标准运行时间:10.3工作器类型:Standard_D16as_v4 64GB 内存,16 核(最小工作器:1,最大工作器:5)驱动类型:Standard_D16as_v4 64GB内存,16核

最佳答案

问题是 jdbc 连接器默认只使用一个数据库连接,因此您的大多数工作人员可能都处于空闲状态。您可以在 Cluster Settings > Metrics > Ganglia UI 中确认这一点。

要真正利用所有工作线程,jdbc 连接器需要知道如何并行检索数据。为此,您需要一个在其值上均匀分布数据的字段。例如,如果您的数据中有一个日期字段,并且每个日期都有相似数量的记录,您可以使用它来拆分数据:

df = spark.read \
.format("jdbc") \
.option("url", jdbcUrl) \
.option("dbtable", tableName) \
.option("user", jdbcUsername) \
.option("password", jdbcPassword) \
.option("numPartitions", 64) \
.option("partitionColumn", "<dateField>") \
.option("lowerBound", "2019-01-01") \
.option("upperBound", "2022-04-07") \
.load()

您必须定义字段名称以及该字段的最小值和最大值,以便 jdbc 连接器可以尝试在工作人员之间平均分配工作。 numPartitions 是打开的单个连接的数量,最佳值取决于集群中的工作线程数量以及数据源可以处理的连接数量。

关于azure - 在Azure databricks中,将pyspark数据帧写入eventhub花费的时间太长,因为数据帧中有300万条记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71791689/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com