直接使用databricks中的原始sql查询存储在azure data lake中的avro数据文件

mxg2im7a  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(385)

我使用databricks笔记本读取存储在azure数据湖gen2中的avro文件。avro文件由事件中心捕获创建,并呈现特定的模式。从这些文件中,我只需要提取body字段,我感兴趣的数据实际存储在那里。
我已经在python中实现了这一点,它的工作原理与预期一致:

path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path) # 1
df1 = df0.select(df0.Body.cast('string')) # 2
rdd1 = df1.rdd.map(lambda x: x[0]) # 3
data = spark.read.json(rdd1) # 4

现在我需要将其转换为原始sql,以便直接在sql查询中过滤数据。考虑到以上4个步骤,sql的步骤1和步骤2如下:

CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")

WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro)

SELECT * FROM body_array

通过这个部分查询,我得到了与上面df1相同的结果(python的步骤2):

Body
[{"id":"a123","group":"0","value":1.0,"timestamp":"2020-01-01T00:00:00.0000000"},
{"id":"a123","group":"0","value":1.5,"timestamp":"2020-01-01T00:01:00.0000000"},
{"id":"a123","group":"0","value":2.3,"timestamp":"2020-01-01T00:02:00.0000000"},
{"id":"a123","group":"0","value":1.8,"timestamp":"2020-01-01T00:03:00.0000000"}]
[{"id":"b123","group":"0","value":2.0,"timestamp":"2020-01-01T00:00:01.0000000"},
{"id":"b123","group":"0","value":1.2,"timestamp":"2020-01-01T00:01:01.0000000"},
{"id":"b123","group":"0","value":2.1,"timestamp":"2020-01-01T00:02:01.0000000"},
{"id":"b123","group":"0","value":1.7,"timestamp":"2020-01-01T00:03:01.0000000"}]
...

我需要知道如何将步骤3和步骤4引入到sql查询中,将字符串解析为json对象,并最终获得所需的带有id、group、value和timestamp列的dataframe。谢谢。

s8vozzvw

s8vozzvw1#

我发现用原始sql实现这一点的一种方法如下,使用from\ json spark sql内置函数和body字段的scheme:

CREATE TEMPORARY VIEW file_avro
USING avro
OPTIONS (path "abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro")

WITH body_array AS (SELECT cast(Body AS STRING) FROM file_avro),
data1 AS (SELECT from_json(Body, 'array<struct<id:string,group:string,value:double,timestamp:timestamp>>') FROM body_array),
data2 AS (SELECT explode(*) FROM data1),
data3 AS (SELECT col.* FROM data2)
SELECT * FROM data3 WHERE id = "a123"     --FILTERING BY CHANNEL ID

它的执行速度比我在问题中发布的python代码快,这肯定是因为使用了from\ json和body模式来提取其中的数据。我在pyspark中对这种方法的版本如下所示:

path = 'abfss://file_system@storage_account.dfs.core.windows.net/root/YYYY/MM/DD/HH/mm/file.avro'
df0 = spark.read.format('avro').load(path)
df1 = df0.selectExpr("cast(Body as string) as json_data")
df2 = df1.selectExpr("from_json(json_data, 'array<struct<id:string,group:string,value:double,timestamp:timestamp>>') as parsed_json")
data = df2.selectExpr("explode(parsed_json) as json").select("json.*")

相关问题