如何用pyspark正确地从csv读取json字符串?

0md85ypi  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(470)

我正在处理来自的电影数据集https://www.kaggle.com/rounakbanik/the-movies-dataset#movies_metadata.csv.
credits.csv文件有三列,cast、crew和id。cast和crew行中填充了json(格式错误,键和值被单引号括起来),我想将它们提取到单独的Dataframe中。但是仅仅尝试加载文件是行不通的。我尝试如下:

import pyspark
spark=SparkSession.builder.appName('movies').getOrCreate()
df = spark.read.csv(os.path.join(input_path, 'credits.csv'), header=True)
df.printSchema()
df.show()

root
 |-- cast: string (nullable = true)
 |-- crew: string (nullable = true)
 |-- id: string (nullable = true)

 +--------------------+--------------------+--------------------+
 |                cast|                crew|                  id|
 +--------------------+--------------------+--------------------+
 |[{'cast_id': 14, ...|"[{'credit_id': '...| 'profile_path': ...|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                8844|
 |[{'cast_id': 2, '...|[{'credit_id': '5...|               15602|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 1|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11862|
 |"[{'cast_id': 25,...| 'credit_id': '52...|         'gender': 0|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11860|
 |[{'cast_id': 2, '...|[{'credit_id': '5...|               45325|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                9091|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|                 710|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 2|
 |[{'cast_id': 9, '...|"[{'credit_id': '...| 'profile_path': ...|
 |"[{'cast_id': 1, ...| 'credit_id': '56...|         'gender': 0|
 |"[{'cast_id': 1, ...| 'credit_id': '52...|         'gender': 2|
 |"[{'cast_id': 1, ...| 'credit_id': '59...|         'gender': 2|
 |"[{'cast_id': 4, ...| 'credit_id': '52...|         'gender': 2|
 |[{'cast_id': 6, '...|[{'credit_id': '5...|                4584|
 |[{'cast_id': 42, ...|"[{'credit_id': '...| 'profile_path': ...|
 |"[{'cast_id': 1, ...|         'order': 14| 'profile_path': ...|
 |[{'cast_id': 1, '...|[{'credit_id': '5...|               11517|
 +--------------------+--------------------+--------------------+
 only showing top 20 rows

id列只能包含数字。cast和crew行应该作为字符串加载,就像我尝试用pandas加载数据时发生的那样。

import pandas as pd
df=pd.read_csv(os.path.join(input_path, 'credits.csv'))
type(df.cast[0])
str

如何将数据加载到spark dataframe中,并将每行的json数据收集到新的dataframe中?

cygmwpex

cygmwpex1#

你可以用 PERMISSIVE csv读取器的模式。下面的示例将起作用。我已经用scala验证过了。

spark.read.format('csv').options(header='true', inferSchema='true', mode='PERMISSIVE').load(path)

参考文献:https://docs.databricks.com/data/data-sources/read-csv.html

相关问题