什么是将kafka json消息解析为pysparkDataframe的最佳方法?

rlcwz9us  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(225)

我正在使用spark structured streaming来阅读kafka主题,并希望将以下复杂的json(kafka msgs)转换为具有“name、address、description、code、department、infa\u op\u type、dtl\u capxtimestamp”列的Dataframe。

{ 
  "meta_data": [{"name":{"string":"INFA_SEQUENCE"},"value": 
{"string":"2,PWX_GENERIC"},"type":null},
          {"name":{"string":"INFA_TABLE_NAME"},"value":{"string":"customers"},"type":null},
          {"name":{"string":"INFA_OP_TYPE"},"value":{"string":"INSERT_EVENT"},"type":null},
          {"name":{"string":"DTL__CAPXRESTART1"},"value":{"string":"B+IABwAfA"},"type":null},
          {"name":{"string":"DTL__CAPXRESTART2"},"value":{"string":"AAABpMwgRDk="},"type":null},
          {"name":{"string":"DTL__CAPXUOW"},"value":{"string":"AAMKPgAAqaIABg=="},"type":null},
          {"name":{"string":"DTL__CAPXUSER"},"value":null,"type":null},
          {"name":{"string":"DTL__CAPXTIMESTAMP"},"value":{"string":"201807310934257270000000"},"type":null},
          {"name":{"string":"DTL__CAPXACTION"},"value":{"string":"I"},"type":null}],
"columns":{"array":[{"name":{"string":"NAME"},"value":{"string":"ABCD"},"isPresent":{"boolean":true}},
                  {"name":{"string":"ADDRESS"},"value":{"string":"123,Bark street"},"isPresent":{"boolean":true}},
                  {"name":{"string":"DESCRIPTION"},"value":{"string":"Canadian"},"isPresent":{"boolean":true}},
                  {"name":{"string":"CODE"},"value":{"string":"3_1"},"isPresent":{"boolean":true}},
                  {"name":{"string":"DEPARTMENT"},"value":{"string":"HR"},"isPresent":{"boolean":true}}
                 ]     }
}

我可以提取两个json对象“meta\u data”和“columns”,但无法分解“columns.array”

newJsonObj = events.select(get_json_object(events.value,'$.meta_data').alias('meta_data'),get_json_object(events.value,'$.columns.array').alias('columns'))

我不知道如何从两个json对象中提取值,并从两个json对象中创建具有列的dataframe。
--事件Dataframe模式--

root
|-- columns: struct (nullable = true)
|    |-- array: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- isPresent: struct (nullable = true)
|    |    |    |    |-- boolean: boolean (nullable = true)
|    |    |    |-- name: struct (nullable = true)
|    |    |    |    |-- string: string (nullable = true)
|    |    |    |-- value: struct (nullable = true)
|    |    |    |    |-- string: string (nullable = true)
|-- meta_data: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- name: struct (nullable = true)
|    |    |    |-- string: string (nullable = true)
|    |    |-- type: string (nullable = true)
|    |    |-- value: struct (nullable = true)
|    |    |    |-- string: string (nullable = true)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题