gpt4 book ai didi

python - 当表已存在时,使用 Spark DataFrame 覆盖表失败

转载 作者:太空宇宙 更新时间:2023-11-03 19:59:57 24 4
gpt4 key购买 nike

我正在尝试使用 Spark 数据帧完全覆盖 postgres 表。由于某种原因,即使我指定了 mode("overwrite"),我也会收到 postgres 关系已存在错误。为什么我的代码没有像预期的那样覆盖数据库中的数据?我已经使用客户端检查了该表,它确实存在(这应该不重要)。里面也有数据。怎么了?这可能是内存问题吗?难道是queryTimeout?s

    df.write.format('jdbc').options(
url=PSQL_URL_SPARK,
driver=SPARK_ENV['PSQL_DRIVER'],
dbtable="schema.table",
user=SPARK_ENV['PSQL_USER'],
password=SPARK_ENV['PSQL_PASS'],
batchsize=2000000,
queryTimeout=690
).mode("overwrite").save()

Traceback (most recent call last):
File "/home/hadoop/spark_script.py", line 671, in <module>
main()
File "/home/hadoop/spark_script.py", line 83, in main
).mode("overwrite").save()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 732, in save
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o352.save.
: org.postgresql.util.PSQLException: ERROR: relation "<table>" already exists
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2468)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2211)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:309)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:446)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:370)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:311)
at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:297)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:274)
at org.postgresql.jdbc.PgStatement.executeUpdate(PgStatement.java:246)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:859)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

最佳答案

我遇到了同样的问题,问题来自数据库模式。确保数据库表中的列和类型与数据框相同。

您可以将数据帧写入新的时态表中,并在 SQL 引擎中使用 DESCRIBE 来查看两个表中的列和类型。您可以尝试再次覆盖临时表,看看是否成功将数据写入现有表。

另一个可能的问题是许可。检查表中用户的权限:

SELECT grantee, privilege_type 
FROM information_schema.role_table_grants
WHERE table_name='mytable';

关于python - 当表已存在时,使用 Spark DataFrame 覆盖表失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59311559/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com