pyspark 如何将多个csv文件摄取到Spark Dataframe 中?

ldioqlga  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(177)

我尝试将两个csv文件放入一个spark Dataframe 中。然而,这两个数据集的模式非常不同,当我执行下面的操作时,我只返回了第二个csv的模式,就好像第一个不存在一样。我该如何解决这个问题呢?我的最终目标是统计总字数。
“lmne.dfs.core.windows.net/csvs/MachineLearning_reddit.csv“test1@lmne.dfs.core.windows.net/csvs/bbc_news.csv”“”“”“”“”“

df0_spark=spark.read.format("csv").option("header","false").load(paths)
df0_spark.write.mode("overwrite").saveAsTable("ML_reddit2")
df0_spark.show()

我试图将两个文件都加载到一个Spark Dataframe 中,但它只返回了其中一个表。

k3fezbri

k3fezbri1#

我复制了上面的结果,得到了下面的结果。
作为示例,我在dbfs中有两个具有不同模式的csv文件。当我执行上面的代码时,我得到了相同的结果。

要获取所需的模式,请在阅读文件时启用mergeSchemaheader

代码:

df0_spark=spark.read.format("csv").option("mergeSchema","true").option("header","true").load(paths)
df0_spark.show()

如果你想合并两个没有空值的文件,我们应该有一个公共的标识列,我们必须单独读取文件,并使用内部连接。

wtzytmuj

wtzytmuj2#

在这种情况下,对我来说最有效的解决方案是分别读取所有的 distinct 文件,然后在将它们放入DataFrame后将它们合并。因此,您的代码可能如下所示:

paths = ["abfss://lmne.dfs.core.windows.net/csvs/MachineLearning_reddit.csv", "abfss://test1@lmne.dfs.core.windows.net/csvs/bbc_news.csv"]

# Load all distinct CSV files

df1 = spark.read.option("header", false).csv(paths[0])
df2 = spark.read.option("header", false).csv(paths[1])

# Union DataFrames

combined_df = df1.unionByName(df2, allowMissingColumns=True)

注意:如果文件之间的列名不同,则对于第一个文件中的所有列(第二个文件中不存在),您将具有null值。如果方案应匹配,则您始终可以在执行unionByName步骤之前重命名列。

相关问题