我创建了一个dag,它从数据库中提取mysql数据并将其加载到云存储,然后将bigquery作为json文件。
dag适用于某些表,但不是所有表,因为它不能解码表中的某些字符。这是相当多的数据,所以我不能准确指出错误或无效字符的位置。
我尝试过将数据库、表和列字符集从utf8更改为utf8mb4。这没用。
我也试过调用encoding='utf-8'和'iso-8859-1',但是我认为我没有正确地调用它们,因为我一直在用我的连接来做这件事,我仍然得到相同的错误。
我正在运行Python2.7.12和airflow v1.8.0
更新:阅读以下内容后:https://cwiki.apache.org/confluence/display/airflow/common+pitfalls 建议使用定义字符集的连接字符串,例如:sql\u alchemy\u conn=mysql://airflow@localhost:3306/气流?字符集=utf8
如何使用云sql示例实现这一点?
podio_connections = [
'mysql_connection'
]
podio_tables = [
'finance_banking_details',
'finance_goods_invoices',
]
default_args = {
'owner': 'xxxxxx',
'start_date': datetime(2018,1,11),
'depends_on_past': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG('mysql_to_bigquery', default_args=default_args, schedule_interval='@daily')
slack_notify = SlackAPIPostOperator(
task_id='slack_notify',
token='xxxxxx',
channel='data-status',
username='airflow',
text='Successfully performed Podio ETL operation',
dag=dag)
for connection in podio_connections:
for table in podio_tables:
extract = MySqlToGoogleCloudStorageOperator(
task_id="extract_mysql_%s_%s"%(connection,table),
mysql_conn_id=connection,
google_cloud_storage_conn_id='gcp_connection',
sql="SELECT *, '%s' as source FROM podiodb.%s"%(connection,table),
bucket='podio-reader-storage',
filename="%s/%s/%s{}.json"%(connection,table,table),
schema_filename="%s/schemas/%s.json"%(connection,table),
dag=dag)
load = GoogleCloudStorageToBigQueryOperator(
task_id="load_bg_%s_%s"%(connection,table),
bigquery_conn_id='gcp_connection',
google_cloud_storage_conn_id='gcp_connection',
bucket='podio-reader-storage',
#destination_project_dataset_table="podio-data.%s.%s"%(connection,table),
destination_project_dataset_table = "podio-data.podio_data1.%s"%(table),
source_objects=["%s/%s/%s*.json"%(connection,table,table)],
schema_object="%s/schemas/%s.json"%(connection,table),
source_format='NEWLINE_DELIMITED_JSON',
write_disposition='WRITE_TRUNCATE',
dag=dag)
load.set_upstream(extract)
slack_notify.set_upstream(load)
[2018-01-12 15:36:10221]{models.py:1417}错误-'utf8'编解码器无法解码第36位的字节0x96:无效的起始字节
回溯(最近一次呼叫):
文件“/usr/local/lib/python2.7/dist packages/airflow/models.py”,第1374行,in run result=task\u copy.execute(context=context)
file“/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql\u to \u gcs.py”,第91行,在execute files\u to \u upload=self.写入\u本地\u数据\u文件(光标)
file“/usr/local/lib/python2.7/dist-packages/airflow/contrib/operators/mysql\u to\u gcs.py”,第136行,in\u write\u local\u data\u files json.dump(row\u dict,tmp\u file\u handle)
文件“/usr/lib/python2.7/json/init.py”,第189行,位于iterable中chunk的dump中:
文件“/usr/lib/python2.7/json/encoder.py”,第434行,位于\u iterencode中,用于\u iterencode\u dict中的块(o,\u current\u indent\u level):
文件“/usr/lib/python2.7/json/encoder.py”,\u iterencode\u dict yield\u encoder(value)中的第390行
unicodedecodeerror:“utf8”编解码器无法解码位置36中的字节0x96:起始字节无效
1条答案
按热度按时间tyg4sfes1#
96
拉丁文十六进制表示“en破折号”。要么将数据更改为utf8,要么将到mysql的连接更改为使用charset latin1。