pyspark 如何在spark中从json输入文件创建dataframe

pdkcd3nj  于 2023-05-16  发布在  Spark
关注(0)|答案(1)|浏览(160)

我正在从下载的JSON文件中创建dataframe,这导致一些数据损坏的错误。我使用spark.read.json(“jsonfilepath”)来创建dataframe。错误类似于:pyspark.sql.utils.AnalysisException:从Spark 2.3开始,当引用的列只包括内部损坏的记录列(默认情况下名为_corrupt_record)时,不允许从原始JSON/CSV文件中查询。例如:spark.read.schema(schema).csv(file).filter($"_corrupt_record”.isNotNull).count()和spark.read.schema(schema).csv(file).select(“_corrupt_record”).show()。相反,您可以缓存或保存解析的结果,然后发送相同的查询。例如,瓦尔df = spark.read.schema(schema).csv(file).cache(),然后是df.filter($"_corrupt_record”.isNotNull).count()。

1mrurvl1

1mrurvl11#

由于您没有指定您遇到的错误,我假设您看到的内容与我看到的类似:

pyspark.errors.exceptions.captured.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).csv(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).csv(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

要修复它,您需要使用以下命令:

spark.read.option("multiline","true").json("json file path")

下面是一个例子:

cat /Users/yuri/test.json 
{
    "name": "Yuri",
    "gender": "male"
}

下面是pyspark代码:

df = spark.read.option("multiline","true").json("/Users/yuri/test.json")
df.printSchema()
df.show(truncate=False)

下面是输出:

>>> df.printSchema()
root
 |-- gender: string (nullable = true)
 |-- name: string (nullable = true)

>>> df.show(truncate=False)
+------+----+
|gender|name|
+------+----+
|male  |Yuri|
+------+----+

相关问题