如何在spark中操作我的Dataframe?

e4eetjau  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(338)

我有一个嵌套的json rdd流,它来自一个Kafka主题。数据如下所示:

{ 
   "time":"sometext1","host":"somehost1","event":
   {"category":"sometext2","computerName":"somecomputer1"}
}

我把它转换成一个Dataframe,模式看起来像

root
 |-- event: struct (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- computerName: string (nullable = true)
 |-- time: string (nullable = true)
 |-- host: string (nullable = true)

我试图用这样的模式将它保存到hdfs上的配置单元表中

category:string
computerName:string
time:string
host:string

这是我第一次使用spark和scala。如果有人能帮助我,我将不胜感激。谢谢

eqoofvh9

eqoofvh91#

// Creating Rdd    
val vals = sc.parallelize(
  """{"time":"sometext1","host":"somehost1","event":  {"category":"sometext2","computerName":"somecomputer1"}}""" ::
    Nil)

// Creating Schema   
val schema = (new StructType)
  .add("time", StringType)
  .add("host", StringType)
  .add("event", (new StructType)
    .add("category", StringType)
    .add("computerName", StringType))

import sqlContext.implicits._
val jsonDF = sqlContext.read.schema(schema).json(vals)

jsondf.printschema

root
 |-- time: string (nullable = true)
 |-- host: string (nullable = true)
 |-- event: struct (nullable = true)
 |    |-- category: string (nullable = true)
 |    |-- computerName: string (nullable = true)

// selecting columns
val df = jsonDF.select($"event.*",$"time",
  $"host")

df.printschema格式

root
 |-- category: string (nullable = true)
 |-- computerName: string (nullable = true)
 |-- time: string (nullable = true)
 |-- host: string (nullable = true)

测向显示

+---------+-------------+---------+---------+
| category| computerName|     time|     host|
+---------+-------------+---------+---------+
|sometext2|somecomputer1|sometext1|somehost1|
+---------+-------------+---------+---------+

相关问题