gpt4 book ai didi

python - MySQL 到云存储桶 Airflow DAG 的 UnicodeDecodeError

转载 作者:行者123 更新时间:2023-11-29 06:49:03 31 4
gpt4 key购买 nike

我创建了一个 DAG,它从数据库中提取 MySQL 数据并将其作为 json 文件加载到云存储,然后加载到 BigQuery。

DAG 适用于某些表,但不适用于所有表,因为它无法解码表中的某些字符。这是相当多的数据,所以我无法准确指出错误或无效字符的位置。

我尝试将数据库、表和列字符集从 utf8 更改为 utf8mb4。这没有帮助。

我也尝试过调用encoding='utf-8'以及'iso-8859-1',但是我认为我没有正确调用它们,因为我一直在用我的连接这样做,并且我仍然得到同样的错误。

我正在运行 Python 2.7.12 和 Airflow v1.8.0

更新:阅读本文后:https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls建议使用定义字符集的连接字符串,例如:sql_alchemy_conn = mysql://airflow@localhost:3306/airflow?charset=utf8

如何使用 Cloud SQL 实例来完成此操作?

podio_connections = [
'mysql_connection'
]

podio_tables = [
'finance_banking_details',
'finance_goods_invoices',
]

default_args = {
'owner': 'xxxxxx',
'start_date': datetime(2018,1,11),
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('mysql_to_bigquery', default_args=default_args, schedule_interval='@daily')

slack_notify = SlackAPIPostOperator(
task_id='slack_notify',
token='xxxxxx',
channel='data-status',
username='airflow',
text='Successfully performed Podio ETL operation',
dag=dag)

for connection in podio_connections:
for table in podio_tables:
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_mysql_%s_%s"%(connection,table),
mysql_conn_id=connection,
google_cloud_storage_conn_id='gcp_connection',
sql="SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
bucket='podio-reader-storage',
filename="%s/%s/%s{}.json"%(connection,table,table),
schema_filename="%s/schemas/%s.json"%(connection,table),
dag=dag)

load = GoogleCloudStorageToBigQueryOperator(
task_id="load_bg_%s_%s"%(connection,table),
bigquery_conn_id='gcp_connection',
google_cloud_storage_conn_id='gcp_connection',
bucket='podio-reader-storage',
#destination_project_dataset_table="podio-data.%s.%s"%(connection,table),
destination_project_dataset_table = "podio-data.podio_data1.%s"%(table),
source_objects=["%s/%s/%s*.json"%(connection,table,table)],
schema_object="%s/schemas/%s.json"%(connection,table),
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag)

load.set_upstream(extract)
slack_notify.set_upstream(load)

[2018-01-12 15:36:10,221] {models.py:1417} ERROR - 'utf8' codec can't decode byte 0x96 in position 36: invalid start byte

Traceback (most recent call last):

File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1374, in run result = task_copy.execute(context=context)

File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql_to_gcs.py", line 91, in execute files_to_upload = self._write_local_data_files(cursor)

File "/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql_to_gcs.py", line 136, in _write_local_data_files json.dump(row_dict, tmp_file_handle)

File "/usr/lib/python2.7/json/init.py", line 189, in dump for chunk in iterable:

File "/usr/lib/python2.7/json/encoder.py", line 434, in _iterencode for chunk in _iterencode_dict(o, _current_indent_level):

File "/usr/lib/python2.7/json/encoder.py", line 390, in _iterencode_dict yield _encoder(value)

UnicodeDecodeError: 'utf8' codec can't decode byte 0x96 in position 36: invalid start byte

最佳答案

96 是 latin1 十六进制,表示“en-dash”。将数据更改为 utf8,或者更改与 MySQL 的连接以表明您正在使用字符集 latin1。

关于python - MySQL 到云存储桶 Airflow DAG 的 UnicodeDecodeError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48227871/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com