我正在使用spark structured streaming来阅读kafka主题,并希望将以下复杂的json(kafka msgs)转换为具有“name、address、description、code、department、infa\u op\u type、dtl\u capxtimestamp”列的Dataframe。
{
"meta_data": [{"name":{"string":"INFA_SEQUENCE"},"value":
{"string":"2,PWX_GENERIC"},"type":null},
{"name":{"string":"INFA_TABLE_NAME"},"value":{"string":"customers"},"type":null},
{"name":{"string":"INFA_OP_TYPE"},"value":{"string":"INSERT_EVENT"},"type":null},
{"name":{"string":"DTL__CAPXRESTART1"},"value":{"string":"B+IABwAfA"},"type":null},
{"name":{"string":"DTL__CAPXRESTART2"},"value":{"string":"AAABpMwgRDk="},"type":null},
{"name":{"string":"DTL__CAPXUOW"},"value":{"string":"AAMKPgAAqaIABg=="},"type":null},
{"name":{"string":"DTL__CAPXUSER"},"value":null,"type":null},
{"name":{"string":"DTL__CAPXTIMESTAMP"},"value":{"string":"201807310934257270000000"},"type":null},
{"name":{"string":"DTL__CAPXACTION"},"value":{"string":"I"},"type":null}],
"columns":{"array":[{"name":{"string":"NAME"},"value":{"string":"ABCD"},"isPresent":{"boolean":true}},
{"name":{"string":"ADDRESS"},"value":{"string":"123,Bark street"},"isPresent":{"boolean":true}},
{"name":{"string":"DESCRIPTION"},"value":{"string":"Canadian"},"isPresent":{"boolean":true}},
{"name":{"string":"CODE"},"value":{"string":"3_1"},"isPresent":{"boolean":true}},
{"name":{"string":"DEPARTMENT"},"value":{"string":"HR"},"isPresent":{"boolean":true}}
] }
}
我可以提取两个json对象“meta\u data”和“columns”,但无法分解“columns.array”
newJsonObj = events.select(get_json_object(events.value,'$.meta_data').alias('meta_data'),get_json_object(events.value,'$.columns.array').alias('columns'))
我不知道如何从两个json对象中提取值,并从两个json对象中创建具有列的dataframe。
--事件Dataframe模式--
root
|-- columns: struct (nullable = true)
| |-- array: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- isPresent: struct (nullable = true)
| | | | |-- boolean: boolean (nullable = true)
| | | |-- name: struct (nullable = true)
| | | | |-- string: string (nullable = true)
| | | |-- value: struct (nullable = true)
| | | | |-- string: string (nullable = true)
|-- meta_data: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: struct (nullable = true)
| | | |-- string: string (nullable = true)
| | |-- type: string (nullable = true)
| | |-- value: struct (nullable = true)
| | | |-- string: string (nullable = true)
暂无答案!
目前还没有任何答案,快来回答吧!