gpt4 book ai didi

python - BigQueryOperator 在 write_disposition ='WRITE_TRUNCATE' 时更改表架构和列模式

转载 作者:行者123 更新时间:2023-12-05 01:33:43 27 4
gpt4 key购买 nike

我正在使用 Airflow 的 BigQueryOperator 使用 write_disposition='WRITE_TRUNCATE' 填充 BQ 表。问题是每次任务运行时,它都会将表模式和列模式从 Required 更改为 Nullable。我使用的 create_disposition 是“CREATE_NEVER”。由于我的表是预先创建的,所以我不希望更改模式或列模式。使用 write_disposition='WRITE_APPEND' 解决了这个问题,但我的要求是使用 WRITE_TRUNCATE。知道为什么 BigQueryOperator 会改变架构和模式吗?

最佳答案

我有一个类似的问题,不是必需的/可为空的 shcema 值,而是策略标签,并且行为是相同的:策略标签被覆盖(并丢失)。以下是 Google 支持团队的回答:

If you overwrite to a destination table, any existing policy tags are removed from the table, unless you use the --destination_schema flag to specify a schema with policy tags.For WRITE_TRUNCATE, the disposition overwrites the existing table and the schema. If you want to keep the policy tags, you can use "--destination_schema" to specify a schema with policy tags.

但是,通过我在 python 中的测试,我观察到 QueryJob(基于 sql 查询的作业并将结果存储在表中)和 LoadJob(从文件加载数据并接收数据的作业)之间的 2 种不同行为在表格中)。

  • 如果您执行 LoadJob,
    • 删除架构自动检测
    • 获取原始表的架构
    • 执行加载作业

在 Python 中是这样的

    job_config = bigquery.job.LoadJobConfig()
job_config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
job_config.write_disposition = bigquery.job.WriteDisposition.WRITE_TRUNCATE
job_config.skip_leading_rows = 1
# job_config.autodetect = True
job_config.schema = client.get_table(table).schema

query_job = client.load_table_from_uri(uri, table, job_config=job_config)
res = query_job.result()

此解决方案用于复制架构,不适用于 QueryJob


解决方法如下(适用于 LoadJob 和 QueryJob)

  • 截断表格
  • 以 WRITE_EMPTY 模式执行作业

权衡:

  • WRITE_TRUNCATE 是原子的:如果写入失败,数据不会被截断
  • 解决方法分为两步:如果写入失败,则数据已被删除
    config = bigquery.job.QueryJobConfig()
config.create_disposition = bigquery.job.CreateDisposition.CREATE_IF_NEEDED
config.write_disposition = bigquery.job.WriteDisposition.WRITE_EMPTY
# Don't work
# config.schema = client.get_table(table).schema
config.destination = table

# Step 1 truncate the table
query_job = client.query(f'TRUNCATE TABLE `{table}`')
res = query_job.result()
# Step 2: Load the new data
query_job = client.query(request, job_config=job_config)
res = query_job.result()

所有这些都是为了告诉您 Airflow 上的 BigQuery 运算符不是问题所在。这是一个 BigQuery 问题。你有一个解决方法来实现你想要的。

关于python - BigQueryOperator 在 write_disposition ='WRITE_TRUNCATE' 时更改表架构和列模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64440753/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com