gpt4 book ai didi

python - Big Query 如何更改列的模式?

转载 作者:行者123 更新时间:2023-12-04 07:16:53 24 4
gpt4 key购买 nike

我有一个 Dataflow 管道,它从 Pub/Sub 获取数据并准备将它们插入到 Big Query 中,然后将它们写入数据库。
它工作正常,它可以自动生成模式,并且能够识别要使用的数据类型和所有内容。
然而,我们使用它的数据在格式上可能会有很大差异。例如:我们可以为单个列同时获得 A 和 B

A {"name":"John"}

B {"name":["Albert", "Einstein"]}
如果我们收到的第一条消息被添加,那么添加第二条将不起作用。
但是,如果我以相反的方式这样做。
我总是收到以下错误:
 

INFO:root:Error: 400 POST https://bigquery.googleapis.com/upload/bigquery/v2/project/projectname/jobs?uploadType=resumable: Provided Schema does not match Table project:test_dataset.test_table. Field cars has changed mode from NULLABLE to REPEATED with loading dataframe
ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7fcb9003f2c0>, due to an exception.
Traceback (most recent call last):
........

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
.....
Provided Schema does not match Table project.test_table. Field cars has changed mode from NULLABLE to REPEATED

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1233, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 582, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "newmain.py", line 211, in process
if load_job and load_job.errors:
UnboundLocalError: local variable 'load_job' referenced before assignment



下面是代码


class WriteDataframeToBQ(beam.DoFn):

def __init__(self, bq_dataset, bq_table, project_id):
self.bq_dataset = bq_dataset
self.bq_table = bq_table
self.project_id = project_id

def start_bundle(self):
self.client = bigquery.Client()

def process(self, df):
# table where we're going to store the data
table_id = f"{self.bq_dataset}.{self.bq_table}"

# function to help with the json -> bq schema transformations
generator = SchemaGenerator(input_format='dict', quoted_values_are_strings=True, keep_nulls=True)

# Get original schema to assist the deduce_schema function. If the table doesn't exist
# proceed with empty original_schema_map
try:
table = self.client.get_table(table_id)
original_schema = table.schema
self.client.schema_to_json(original_schema, "original_schema.json")
with open("original_schema.json") as f:
original_schema = json.load(f)
original_schema_map, original_schema_error_logs = generator.deduce_schema(input_data=original_schema)
except Exception:
logging.info(f"{table_id} table not exists. Proceed without getting schema")
original_schema_map = {}

# convert dataframe to dict
json_text = df.to_dict('records')

# generate the new schema, we need to write it to a file because schema_from_json only accepts json file as input
schema_map, error_logs = generator.deduce_schema(input_data=json_text, schema_map=original_schema_map)
schema = generator.flatten_schema(schema_map)

schema_file_name = "schema_map.json"
with open(schema_file_name, "w") as output_file:
json.dump(schema, output_file)

# convert the generated schema to a version that BQ understands
bq_schema = self.client.schema_from_json(schema_file_name)

job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION
],
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema=bq_schema
)
job_config.schema = bq_schema

try:
load_job = self.client.load_table_from_json(
json_text,
table_id,
job_config=job_config,
) # Make an API request.

load_job.result() # Waits for the job to complete.
if load_job.errors:
logging.info(f"error_result = {load_job.error_result}")
logging.info(f"errors = {load_job.errors}")
else:
logging.info(f'Loaded {len(df)} rows.')

except Exception as error:
logging.info(f'Error: {error} with loading dataframe')

if load_job and load_job.errors:
logging.info(f"error_result = {load_job.error_result}")
logging.info(f"errors = {load_job.errors}")


def run(argv):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args, save_main_session=True, streaming=True)
options = pipeline_options.view_as(JobOptions)

with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| "Read PubSub Messages" >> beam.io.ReadFromPubSub(subscription=options.input_subscription)
| "Write Raw Data to Big Query" >> beam.ParDo(WriteDataframeToBQ(project_id=options.project_id, bq_dataset=options.bigquery_dataset, bq_table=options.bigquery_table))
)


if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
run(sys.argv)

有没有办法改变表格的限制来使这项工作?

最佳答案

BigQuery 不是文档数据库,而是面向列的数据库。此外,您无法更新现有列的架构(只能添加或删除它们)。
对于您的用例,并且由于您无法知道/预测每个字段的最通用模式,因此更安全的方法是将原始 JSON 存储为字符串,然后使用 JSON functions of BigQuery在 SQL 中发布处理您的数据

关于python - Big Query 如何更改列的模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68713574/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com