gpt4 book ai didi

python - 在(Pyspark?

转载 作者:太空宇宙 更新时间:2023-11-04 04:41:07 25 4
gpt4 key购买 nike

尝试将 postgreSQL DB 转换为 Dataframe。以下是我的代码:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("Connect to DB") \
.getOrCreate()

jdbcUrl = "jdbc:postgresql://XXXXXX"
connectionProperties = {
"user" : " ",
"password" : " ",
"driver" : "org.postgresql.Driver"
}

query = "(SELECT table_name FROM information_schema.tables) XXX"
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)

table_name_list = df.select("table_name").rdd.flatMap(lambda x: x).collect()
for table_name in table_name_list:
df2 = spark.read.jdbc(url=jdbcUrl, table=table_name, properties=connectionProperties)

我得到的错误:

java.sql.SQLException: Unsupported type ARRAY on generating df2 for table name

如果我硬编码表名值,我不会得到同样的错误

df2 = spark.read.jdbc(jdbcUrl,"conditions",properties=connectionProperties) 

我检查了 table_name 类型,它是 String ,这是正确的方法吗?

最佳答案

我猜你不想要属于 postgres 内部工作的表名,例如 pg_typepg_policies 等,它们的模式是 pg_catalog 导致错误的地方

py4j.protocol.Py4JJavaError: An error occurred while calling o34.jdbc. : java.sql.SQLException: Unsupported type ARRAY

当你试图把它们读成

spark.read.jdbc(url=jdbcUrl, table='pg_type', properties=connectionProperties)

并且有applicable_rolesview_table_usage等表,其架构类型为information_schema导致

py4j.protocol.Py4JJavaError: An error occurred while calling o34.jdbc. : org.postgresql.util.PSQLException: ERROR: relation "view_table_usage" does not exist

当你试图把它们读成

spark.read.jdbc(url=jdbcUrl, table='view_table_usage', properties=connectionProperties)

模式类型为public的表可以使用上述jdbc命令读入表。

I checked table_name type and it is String , is this the correct approach ?

因此您需要过滤掉那些表名并将您的逻辑应用为

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("Connect to DB") \
.getOrCreate()

jdbcUrl = "jdbc:postgresql://hostname:post/"
connectionProperties = {
"user" : " ",
"password" : " ",
"driver" : "org.postgresql.Driver"
}

query = "information_schema.tables"
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)

table_name_list = df.filter((df["table_schema"] != 'pg_catalog') & (df["table_schema"] != 'information_schema')).select("table_name").rdd.flatMap(lambda x: x).collect()
for table_name in table_name_list:
df2 = spark.read.jdbc(url=jdbcUrl, table=table_name, properties=connectionProperties)

这应该行得通

关于python - 在(Pyspark?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50613977/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com