读取pyspark中未分区的csv文件时跳过特定行

t40tm48m  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(577)

我有一个未分区的gzip csv文件,我读到Spark。读取gzip文件不是一个问题,但是只要使用一个操作来计算sparkDataframe,并且该操作触及一个特定的有问题的行,就会抛出一个错误。如果我使用 df.limit() 我可以将read上的dataframe子集到有问题的观察之前的行的行号,然后可以继续我的工作流程而不会出错。
我的问题是,有没有一种方法可以跳过观察中的阅读。我想沿着df.limit\u range(100:200)做一些事情,读取csv时跳过100-200行。我尝试了各种方法来生成索引列,然后进行过滤,但在计算时遇到了问题。下面,我尝试将子集合到有问题的行之前的所有行,然后与原始未筛选的Dataframe反连接,但是一旦计算了有问题的行,就会再次导致错误,表明gzip文件无法读入。

df_full = df.withColumn("rowId", monotonically_increasing_id())
df_head = df_full.limit(100).where(col("rowID") < 99)
anti_df = df_full.join(df_head, "id", "left_anti")

错误:

FileReadException: Error while reading file s3a://some-s3-bucket/dir/subdir/file_name.gz.
Caused by: EOFException: Unexpected end of input stream
luaexgnf

luaexgnf1#

可以使用列上的筛选器来读取除第100-200行以外的所有行。

from pyspark.sql import functions as f

df_full = df.withColumn("rowId", f.monotonically_increasing_id())
anti_df = df_full.filter("rowId <= 100 or rowId >= 200")

输出 anti_df 将:

+----+-----+
|   z|rowId|
+----+-----+
     :
     :
|3.38|   95|
| 3.4|   96|
|4.07|   97|
|3.56|   98|
|3.66|   99|
|3.65|  100|
|3.43|  200|
|3.49|  201|
|3.48|  202|
| 3.6|  203|
|4.08|  204|
|3.63|  205|
     :
     :

只要确保你的过滤器在你的星火计划中被按下。我的意思是,过滤器应该在读取之后立即执行,而不是在对其执行多个计算之后执行(此时您的代码可能会因错误而失败)。

相关问题