使用pyspark阅读csv文件时编码错误

kx5bkwkv  于 2023-01-15  发布在  Spark
关注(0)|答案(2)|浏览(154)

在我的大学课程中,我运行pyspark-notebook docker image

docker pull jupyter/pyspark-notebook
docker run -it --rm -p 8888:8888 -v /path/to/my/working/directory:/home/jovyan/work jupyter/pyspark-notebook

然后运行下一个python代码

import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import *

sc = pyspark.SparkContext('local[*]')
spark = SparkSession(sc)
spark

listings_df = spark.read.csv("listings.csv", header=True, mode='DROPMALFORMED') 
# adding encoding="utf8" to the line above doesn't help also
listings_df.printSchema()

阅读文件时出现问题,spark好像读取我的文件不正确(可能是因为编码问题?),读取listings_df后有16494行,而正确的行数是16478(用pandas.read_csv()检查),运行

listings_df.groupBy("room_type").count().show()

它给出下一个输出

+---------------+-----+
|      room_type|count|
+---------------+-----+
|            169|    1|
|        4.88612|    1|
|        4.90075|    1|
|    Shared room|   44|
|             35|    1|
|            187|    1|
|           null|   16|
|             70|    1|
|             27|    1|
|             75|    1|
|     Hotel room|  109|
|            198|    1|
|             60|    1|
|            280|    1|
|Entire home/apt|12818|
|            220|    1|
|            190|    1|
|            156|    1|
|            450|    1|
|        4.88865|    1|
+---------------+-----+
only showing top 20 rows

而真实的的room_type值仅为[“私人房间”、“整个住宅/公寓”、“酒店房间”、“共享房间”]。
可能有用的Spark信息:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.1.2
Master
local[*]
AppName
pyspark-shell

和文件的编码

!file listings.csv
listings.csv: UTF-8 Unicode text

listings.csv是从here下载的Airbnb统计csv文件
所有运行和驱动代码我也已上传到Colab

ckx4rj1h

ckx4rj1h1#

我发现了两件事:
1.某些行有要转义的引号(escape='"'
1.此外,@JosefZ还提到了不必要的换行符(multiLine=True
你应该这样读:

input_df = spark.read.csv(path, header=True, multiLine=True, escape='"')

output_df = input_df.groupBy("room_type").count()
output_df.show()
+---------------+-----+
|      room_type|count|
+---------------+-----+
|    Shared room|   44|
|     Hotel room|  110|
|Entire home/apt|12829|
|   Private room| 3495|
+---------------+-----+
9avjhtql

9avjhtql2#

我认为从这里开始对文件进行编码应该可以解决这个问题,所以将encoding="utf8"添加到变量listings_df的元组中。
如下图所示:

listings_df = spark.read.csv("listings.csv", encoding="utf8", header=True, mode='DROPMALFORMED')

相关问题