gpt4 book ai didi

python - 通过 Python 创建 Bigquery 表并加载数据

转载 作者:行者123 更新时间:2023-12-02 06:22:34 28 4
gpt4 key购买 nike

我正在尝试从 MixPanel 中提取事件,对其进行处理,然后上传到 BigQuery 表(创建新表)。

我在 Google 上搜索了所有可用资源,但对解决问题没有帮助。

Below is my code,
# Required modules import
import os
from mixpanel_api import Mixpanel
import collections
import json
from google.cloud import storage, bigquery
# Function to flatten exported file
def flatten(d, parent_key='', sep=''):
items = []
for k, v in d.items():
new_key = parent_key.replace("PROPERTIES","").replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'","") + sep + k.replace(" ","").replace("_","").replace("$","").replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'","") if parent_key else k
#new_key = parent_key.replace("PROPERTIES","").join(e for e in parent_key if e.isalnum()) + sep + k.join(e for e in k if e.isalnum()) if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key.upper(), sep=sep).items())
else:
items.append((new_key.upper().replace("-","_").replace("[","").replace("]","").replace("/","").replace("\\","").replace("'",""), v))
#items.append((new_key.upper().join(e for e in new_key if e.isalnum()), v))
#items.append(("ID","1"))
#items.append(("PROCESS_DATE",""))
#items.append(("DATA_DATE",""))
return dict(items)
# Start of execution point
if __name__ == '__main__':

# Secret and token to access API
api_sec = 'aa8af6b5ca5a5ed30e20f3af0acdfb2d'
api_tok = 'ad5234953e64b908bcd35388875324db'
# User input for date range and filename
start_date = str(input('Enter the start date(format: YYYY-MM-DD): '))
end_date = str(input('Enter the end date(format: YYYY-MM-DD): '))
file_name = str(input('Enter filename to store output: '))
file_formatter = str(input('Enter filename to store formatted output: '))
# Instantiating Mixpanel object
mpo = Mixpanel(api_sec,
api_tok
)
# Exporting events for the specified date range and storing in the filename provided, gunzip'ed file
mpo.export_events(file_name,
{'from_date':start_date,
'to_date':end_date
},
add_gzip_header=False,
raw_stream=True
)

# Dict for schema derived from file
schema_dict = {}
# Flatten file and write-out to another file
with open(file_name, 'r') as uf, open(file_formatter, 'a') as ff, open('schema_file', 'a') as sf:
#schema_list = []
for line in uf:
temp = flatten(json.loads(line))
for k in temp.keys():
if k not in schema_dict:
schema_dict[k] = "STRING"
#schema_list.append({"name" : k, "type" : "STRING"})
#ff.write(json.dumps(temp))
json.dump(temp, ff, indent = None, sort_keys = True) # Dumps each dictionary entry as a newline entry, even '{' '}' is on new lines
ff.write('\n') # Adds a new line after each object dump to file
#json.dump(schema_dict, sf, indent = None, sort_keys = True)
#json.dump(schema_list, sf, indent = None, sort_keys = True)

# Removing source file
if os.path.isfile(file_name):
sfr = os.remove(file_name)
if sfr == None:
print 'File ' +file_name+ ' removed from local storage'
else:
print 'File ' +file_name+ ' remove failed from local storage'
# Uploading file to Google bucket
client = storage.Client()
bucket = client.get_bucket('yathin-sample-bucket')
blob = bucket.blob(file_formatter)
status = blob.upload_from_filename(file_formatter)
if status == None:
print 'File ' +file_formatter+ ' upload success. Removing local copy.'
fr = os.remove(file_formatter)
if fr == None:
print 'File ' +file_formatter+ ' removed from local storage'
else:
print 'File ' +file_formatter+ ' remove failed from local storage'
# Loading file to BigQuery
client = bigquery.Client()
dataset_id = 'sample_dataset'
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.schema = [ bigquery.SchemaField(k,v) for k,v in schema_dict.items() ]
#job_config.autodetect = True
#job_config.create_dsiposition = 'CREATE_IF_NEEDED'
#job_config.write_disposition = 'WRITE_APPEND'
job_config.source_format = 'NEWLINE_DELIMITED_JSON'
uri = 'gs://yathin-sample-bucket/'+file_formatter
load_job = client.load_table_from_uri(
uri,
dataset_ref.table('test_json'),
job_config=job_config) # API request
#assert load_job.job_type == 'load'
#load_job.result() # Waits for table load to complete.

此代码未返回任何错误,但未创建表。

有人可以帮忙指出问题所在吗?

最佳答案

可能存在错误,但您的脚本中没有返回结果。我不确定您为什么注释掉 load_job.result() 但这可能是确保作业完成所必需的。

如果仍然没有错误,此脚本可以为您提供最近作业的列表以及带有任何错误代码的结果。只需更改 max_results kwarg 即可。

client = biquery.Client()
for job in client.list_jobs(max_results=1, all_users=False):
jobid = job.job_id
job = client.get_job(jobid)
print("------BIG QUERY JOB ERROR REASON", job.errors)

另外,根据评论中关于如何检查表是否存在的问题...

from google.cloud.exceptions import NotFound
client = bigquery.Client()
try:
dataset = client.dataset('DatasetName')
table_ref = dataset.table('TableName')
client.get_table(table_ref)
except: NotFound:
print('Table Not Found')

关于python - 通过 Python 创建 Bigquery 表并加载数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50565735/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com