- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我有一个以 JSON 格式(具有名称、类型、模式属性)定义并存储在文件中的重要表架构(涉及嵌套和重复字段)。它已成功用于通过 bq load 命令填充 bigquery 表。
但是当我尝试使用 Dataflow Python SDK 和 BigQuerySink 做同样的事情时,schema
参数需要是 'name':'type'< 的逗号分隔列表
元素,或 bigquery.TableSchema
对象。
有什么方便的方法可以将我的 JSON 架构转换为 bigquery.TableSchema
,或者我是否必须将其转换为 name:value
列表?
最佳答案
Andrea Pierleoni 发布的上述代码片段适用于 google-cloud-bigquery
python 客户端的旧版本,例如 google 的
恰好通过 0.25.0
版本-cloud-bigquerypip install apache-beam[gcp]
安装。
但是,BigQuery Python 客户端 API 在 google-cloud-bigquery
的最新版本中发生了巨大变化,例如我目前使用的版本 1.8.0
、bigquery.TableFieldSchema()
和 bigquery.TableSchema()
不起作用。
如果您使用的是较新版本的 google-cloud-bigquery
包,您可以按照以下方法获取所需的 SchemaField
列表(创建表所需的,例如)来自 JSON 文件。这是对 Andrea Pierleoni 上面发布的代码的改编(感谢!)
def _get_field_schema(field):
name = field['name']
field_type = field.get('type', 'STRING')
mode = field.get('mode', 'NULLABLE')
fields = field.get('fields', [])
if fields:
subschema = []
for f in fields:
fields_res = _get_field_schema(f)
subschema.append(fields_res)
else:
subschema = []
field_schema = bigquery.SchemaField(name=name,
field_type=field_type,
mode=mode,
fields=subschema
)
return field_schema
def parse_bq_json_schema(schema_filename):
schema = []
with open(schema_filename, 'r') as infile:
jsonschema = json.load(infile)
for field in jsonschema:
schema.append(_get_field_schema(field))
return schema
现在,假设您有一个表的 schema already defined in JSON .假设你有 this particular "schema.json" file ,然后使用上述辅助方法,您可以获得 Python 客户端所需的 SchemaField
表示,如下所示:
>>> res_schema = parse_bq_json_schema("schema.json")
>>> print(res_schema)
[SchemaField(u'event_id', u'INTEGER', u'REQUIRED', None, ()), SchemaField(u'event_name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'event_types', u'STRING', u'REPEATED', None, ()), SchemaField(u'product_code', u'STRING', u'REQUIRED', None, ()), SchemaField(u'product_sub_code', u'STRING', u'REPEATED', None, ()), SchemaField(u'source', u'RECORD', u'REQUIRED', None, (SchemaField(u'internal', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))), SchemaField(u'external', u'RECORD', u'NULLABLE', None, (SchemaField(u'name', u'STRING', u'REQUIRED', None, ()), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()))))), SchemaField(u'timestamp', u'TIMESTAMP', u'REQUIRED', None, ()), SchemaField(u'user_key', u'RECORD', u'REQUIRED', None, (SchemaField(u'device_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'cookie_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'profile_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'best_id', u'STRING', u'REQUIRED', None, ()))), SchemaField(u'message_id', u'STRING', u'REQUIRED', None, ()), SchemaField(u'message_type', u'STRING', u'REQUIRED', None, ()), SchemaField(u'tracking_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'funnel_stage', u'STRING', u'NULLABLE', None, ()), SchemaField(u'location', u'RECORD', u'NULLABLE', None, (SchemaField(u'latitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'longitude', u'FLOAT', u'REQUIRED', None, ()), SchemaField(u'geo_region_id', u'INTEGER', u'NULLABLE', None, ()))), SchemaField(u'campaign_id', u'STRING', u'NULLABLE', None, ()), SchemaField(u'topic', u'STRING', u'REQUIRED', None, ())]
现在到create a table having the above schema using the Python SDK ,你会这样做:
dataset_ref = bqclient.dataset('YOUR_DATASET')
table_ref = dataset_ref.table('YOUR_TABLE')
table = bigquery.Table(table_ref, schema=res_schema)
您可以选择像这样设置基于时间的分区(如果需要):
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field='timestamp' # name of column to use for partitioning
)
这最终创建了表:
table = bqclient.create_table(table)
print('Created table {}, partitioned on column {}'.format(
table.table_id, table.time_partitioning.field))
关于python - BigQuerySink 的 bigquery.TableSchema 的 JSON 表架构,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36127537/
我有一个以 JSON 格式(具有名称、类型、模式属性)定义并存储在文件中的重要表架构(涉及嵌套和重复字段)。它已成功用于通过 bq load 命令填充 bigquery 表。 但是当我尝试使用 Dat
就DataflowRunner内部的实现细节而言,很多人可能并不关心使用的是BigQuerySink还是WriteToBigQuery。 但是,就我而言,我尝试将代码部署到使用 RunTimeValu
我是一名优秀的程序员,十分优秀!