我有RDD:
[{'systemID': '617914',
'typeID': '1',
'taxID': '1',
'workerID': '1011778',
'workerExID': '70000111',
'number': '5',
'shiftNumber': '167',
'numberInShift': '6',
'printedDate': '2022-10-03T15:38:09',
'total': '990.0000',
'IsEReceipt': 'false',
'version': '1.05',
'receiptKISExid': '023442935',
'attribute_1': '1234567890',
'attribute_2': '65e2b71b-c2de-4681-9cf1-29701f5ce6bb',
'city_id': 'BC29FE50'},
{'systemID': '617915',
'typeID': '1',
'taxID': '1',
'workerID': '1011778',
'workerExID': '70000111',
'number': '6',
'shiftNumber': '167',
'numberInShift': '7',
'printedDate': '2022-10-03T16:48:35',
'total': '640.0000',
'IsEReceipt': 'false',
'version': '1.05',
'receiptKISExid': '0234434052',
'attribute_3': '00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000@@00000000-0000-0000-0000-000000000000',
'city_id': 'BC29FE50'},
{'systemID': '617916',
'typeID': '1',
'taxID': '1',
'workerID': '1011778',
'workerExID': '70000111',
'number': '7',
'shiftNumber': '167',
'numberInShift': '8',
'printedDate': '2022-10-03T17:19:46',
'total': '310.0000',
'version': '1.05',
'receiptKISExid': '0234435605',
'attribute_1': '1234567890',
'city_id': 'BC29FE50'}]
应将其转换为带字段的DataFrame:
system ID、typeID、axID、workerID、workerExID、Number、ShiftNumber、number InShift、printedDate、Total、IsEReceipt、Version、ReceiptKISExid、ATTRIBUTE_1、ATTRIBUTE_2、ATTRIBUTE_3、CITY_ID
但字段ATTRIBUTE_1、ATTRIBUTE_2、ATTRIBUTE_3不能为每行而存在。转换为DataFrame后,我只从前面提到的字段获取数据,例如,我得到ATTRIBUTE_1,但ATTRIBUTE_2,ATTRIBUTE_3为空。
现在我使用:
ORDER_COL_NAMES = ['systemID','typeID','taxID','workerID','workerExID','number','shiftNumber','numberInShift','printedDate','total','IsEReceipt','version','receiptKISExid','attribute_1','attribute_2','attribute_3','city_id']
def set_schema():
schema_list = []
for c in ORDER_COL_NAMES:
if c == 'total':
schema_list.append(StructField(c, FloatType(), True))
elif c == 'print_date':
schema_list.append(StructField(c, DateType(), True))
else:
schema_list.append(StructField(c, StringType(), True))
return StructType(schema_list)
df_schema = set_schema()
order_df = orders.toDF(df_schema)
提前谢谢您!
1条答案
按热度按时间zujrkrfu1#
我希望下面的代码将解决您显式处理模式的问题