我正在处理来自的电影数据集https://www.kaggle.com/rounakbanik/the-movies-dataset#movies_metadata.csv.
credits.csv文件有三列,cast、crew和id。cast和crew行中填充了json(格式错误,键和值被单引号括起来),我想将它们提取到单独的Dataframe中。但是仅仅尝试加载文件是行不通的。我尝试如下:
import pyspark
spark=SparkSession.builder.appName('movies').getOrCreate()
df = spark.read.csv(os.path.join(input_path, 'credits.csv'), header=True)
df.printSchema()
df.show()
root
|-- cast: string (nullable = true)
|-- crew: string (nullable = true)
|-- id: string (nullable = true)
+--------------------+--------------------+--------------------+
| cast| crew| id|
+--------------------+--------------------+--------------------+
|[{'cast_id': 14, ...|"[{'credit_id': '...| 'profile_path': ...|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 8844|
|[{'cast_id': 2, '...|[{'credit_id': '5...| 15602|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 1|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11862|
|"[{'cast_id': 25,...| 'credit_id': '52...| 'gender': 0|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11860|
|[{'cast_id': 2, '...|[{'credit_id': '5...| 45325|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 9091|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 710|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 2|
|[{'cast_id': 9, '...|"[{'credit_id': '...| 'profile_path': ...|
|"[{'cast_id': 1, ...| 'credit_id': '56...| 'gender': 0|
|"[{'cast_id': 1, ...| 'credit_id': '52...| 'gender': 2|
|"[{'cast_id': 1, ...| 'credit_id': '59...| 'gender': 2|
|"[{'cast_id': 4, ...| 'credit_id': '52...| 'gender': 2|
|[{'cast_id': 6, '...|[{'credit_id': '5...| 4584|
|[{'cast_id': 42, ...|"[{'credit_id': '...| 'profile_path': ...|
|"[{'cast_id': 1, ...| 'order': 14| 'profile_path': ...|
|[{'cast_id': 1, '...|[{'credit_id': '5...| 11517|
+--------------------+--------------------+--------------------+
only showing top 20 rows
id列只能包含数字。cast和crew行应该作为字符串加载,就像我尝试用pandas加载数据时发生的那样。
import pandas as pd
df=pd.read_csv(os.path.join(input_path, 'credits.csv'))
type(df.cast[0])
str
如何将数据加载到spark dataframe中,并将每行的json数据收集到新的dataframe中?
1条答案
按热度按时间cygmwpex1#
你可以用
PERMISSIVE
csv读取器的模式。下面的示例将起作用。我已经用scala验证过了。参考文献:https://docs.databricks.com/data/data-sources/read-csv.html