提取并分解apachespark中嵌入的json字段

iszxjhcz  于 2021-07-09  发布在  Spark
关注(0)|答案(1)|浏览(381)

我对spark完全陌生,但不介意答案是python还是scala。出于隐私原因,我无法显示实际数据,但基本上我是在读取结构如下的json文件:

{
      "EnqueuedTimeUtc": 'some date time',
      "Properties": {},
      "SystemProperties": {
          "connectionDeviceId": "an id",
          "some other fields that we don't care about": "data"
      },
      "Body": {
        "device_id": "an id",
        "tabs": [
            {
              "selected": false,
               "title": "some title",
               "url": "https:...."
            },
            {"same again, for multiple tabs"}
        ]
     }
   }

大多数数据都不感兴趣。我想要的是一个由时间、设备id和url组成的Dataframe。同一设备和时间可以有多个url,因此我希望将这些url分解为每个url一行。
|时间戳|设备| id | url|
我眼前的问题是,当我读到这篇文章时,虽然它可以计算出systemproperties的结构,但body只是一个字符串,可能是因为变量。也许我需要指定模式,这会有帮助吗?

root
 |-- Body: string (nullable = true)
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- SystemProperties: struct (nullable = true)
 |    |-- connectionAuthMethod: string (nullable = true)
 |    |-- connectionDeviceGenerationId: string (nullable = true)
 |    |-- connectionDeviceId: string (nullable = true)
 |    |-- contentEncoding: string (nullable = true)
 |    |-- contentType: string (nullable = true)
 |    |-- enqueuedTime: string (nullable = true)

有没有一种有效的方法(有很多这样的记录)来提取URL并与时间和设备id相关联?提前谢谢。

l7wslrjt

l7wslrjt1#

下面是一个提取的例子。基本上你可以用 from_json 把身体转化成更有结构的东西 explode(transform()) 获取URL并展开到不同的行。


# Sample dataframe

df.show(truncate=False)
+----------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------+
|Body                                                                                                                                          |EnqueuedTimeUtc|SystemProperties|
+----------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------+
|{"device_id":"an id","tabs":[{"selected":false,"title":"some title","url":"https:1"},{"selected":false,"title":"some title","url":"https:2"}]}|some date time |[an id]         |
+----------------------------------------------------------------------------------------------------------------------------------------------+---------------+----------------+

df.printSchema()
root
 |-- Body: string (nullable = true)
 |-- EnqueuedTimeUtc: string (nullable = true)
 |-- SystemProperties: struct (nullable = true)
 |    |-- connectionDeviceId: string (nullable = true)

# Extract desired properties

df2 = df.selectExpr(
    "EnqueuedTimeUtc as timestamp", 
    "from_json(Body, 'device_id string, tabs array<map<string,string>>') as Body"
).selectExpr(
    "timestamp", 
    "Body.device_id", 
    "explode(transform(Body.tabs, x -> x.url)) as url"
)

df2.show()
+--------------+---------+-------+
|     timestamp|device_id|    url|
+--------------+---------+-------+
|some date time|    an id|https:1|
|some date time|    an id|https:2|
+--------------+---------+-------+

相关问题