如何运行spark读取json文件并显示内容?

6g8kf2rb  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(523)

spark\u job.py文件包含以下内容:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import Window

def readMyStream(rdd):
  if not rdd.isEmpty():
    df = spark.read.json(rdd)
    print('Started the Process')
    print('Selection of Columns')
    df = df.select('t1','t2','t3','timestamp').where(col("timestamp").isNotNull())
    df.show()

if __name__ == '__main__':
    sc = SparkContext.getOrCreate()
    spark = SparkSession(sc)
    ssc = StreamingContext(sc, 5)

    stream_data = ssc.textFileStream("jsondata.json")
    stream_data.foreachRDD( lambda rdd: readMyStream(rdd) )
    ssc.start()
    ssc.stop()

jsondata.json文件包含以下内容:

[{"timestamp": "1571053218000","t1": "55.23","t2": "10","t3": "ON"},

{"timestamp": "1571053278000","t1": "63.23","t2": "11","t3": "OFF"},

{"timestamp": "1571053338000","t1": "73.23","t2": "12","t3": "ON"},

{"timestamp": "1571053398000","t1": "83.23","t2": "13","t3": "ON"}]

跑步:

python spark_job.py

给我这个:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
PS C:\Users\Admin\Desktop\madi_kafka> SUCCESS: The process with PID 10272 (child process of PID 2544) has been terminated.
SUCCESS: The process with PID 2544 (child process of PID 10652) has been terminated.
SUCCESS: The process with PID 10652 (child process of PID 4516) has been terminated.
k5ifujac

k5ifujac1#

我在scala中使用了以下代码,我认为这可能会有所帮助:

import session.implicits._
case class TClass(timestamp:String,t1:String,t2:String,t3:String)
val jsonData= session.read.option("inferSchema","true").option("multiline","true").option("header","true").json("data/jsondata.json").as[TClass]
jsonData.printSchema()
jsonData.show()
print("Started the Process")
print("Selection of Columns")
val df = jsonData.select("timestamp","t1","t2","t3").where(col("timestamp") isNotNull)
df.show()

把这个拿出来:

+-------------+-----+---+---+
|    timestamp|   t1| t2| t3|
+-------------+-----+---+---+
|1571053218000|55.23| 10| ON|
|1571053278000|63.23| 11|OFF|
|1571053338000|73.23| 12| ON|
|1571053398000|83.23| 13| ON|
+-------------+-----+---+---+

我希望它能对你有所帮助。

hwamh0ep

hwamh0ep2#

show函数可以帮助您,我认为这个代码示例可以帮助您解决您的问题:

val data = session.sqlContext.read.format("json").load("data/input.json")
val first = data.show()

大多数情况下,spark都可以隐式地找到数据的模式。

相关问题