我使用databricks笔记本读取存储在azure数据湖gen2中的avro文件。avro文件由事件中心捕获创建,并呈现特定的模式。从这些文件中,我只需要提取body字段,我感兴趣的数据实际存储在那里。
我已经在python中实现了这一点,它的工作原理与预期一致:
path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path) # 1
df1 = df0.select(df0.Body.cast('string')) # 2
rdd1 = df1.rdd.map(lambda x: x[0]) # 3
data = spark.read.json(rdd1) # 4
现在我需要将其转换为原始sql,以便直接在sql查询中过滤数据。考虑到以上4个步骤,sql的步骤1和步骤2如下:
CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")
WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro)
SELECT * FROM body_array
通过这个部分查询,我得到了与上面df1相同的结果(python的步骤2):
Body
[{"id":"a123","group":"0","value":1.0,"timestamp":"2020-01-01T00:00:00.0000000"},
{"id":"a123","group":"0","value":1.5,"timestamp":"2020-01-01T00:01:00.0000000"},
{"id":"a123","group":"0","value":2.3,"timestamp":"2020-01-01T00:02:00.0000000"},
{"id":"a123","group":"0","value":1.8,"timestamp":"2020-01-01T00:03:00.0000000"}]
[{"id":"b123","group":"0","value":2.0,"timestamp":"2020-01-01T00:00:01.0000000"},
{"id":"b123","group":"0","value":1.2,"timestamp":"2020-01-01T00:01:01.0000000"},
{"id":"b123","group":"0","value":2.1,"timestamp":"2020-01-01T00:02:01.0000000"},
{"id":"b123","group":"0","value":1.7,"timestamp":"2020-01-01T00:03:01.0000000"}]
...
我需要知道如何将步骤3和步骤4引入到sql查询中,将字符串解析为json对象,并最终获得所需的带有id、group、value和timestamp列的dataframe。谢谢。
1条答案
按热度按时间s8vozzvw1#
我发现用原始sql实现这一点的一种方法如下,使用from\ json spark sql内置函数和body字段的scheme:
它的执行速度比我在问题中发布的python代码快,这肯定是因为使用了from\ json和body模式来提取其中的数据。我在pyspark中对这种方法的版本如下所示: