我与Pandas和SparkDataframe工作。Dataframe总是非常大(>20gb),标准的spark函数对于这些大小是不够的。目前,我正在将PandasDataframe转换为sparkDataframe,如下所示:
dataframe = spark.createDataFrame(pandas_dataframe)
我这样做是因为spark将Dataframe写入hdfs非常简单:
dataframe.write.parquet(output_uri, mode="overwrite", compression="snappy")
但是对于大于2GB的Dataframe,转换失败了。如果我将sparkDataframe转换为Pandas,我可以使用pyarrow:
// temporary write spark dataframe to hdfs
dataframe.write.parquet(path, mode="overwrite", compression="snappy")
// open hdfs connection using pyarrow (pa)
hdfs = pa.hdfs.connect("default", 0)
// read parquet (pyarrow.parquet (pq))
parquet = pq.ParquetDataset(path_hdfs, filesystem=hdfs)
table = parquet.read(nthreads=4)
// transform table to pandas
pandas = table.to_pandas(nthreads=4)
// delete temp files
hdfs.delete(path, recursive=True)
这是一个从spark到pandas的快速对话,它也适用于大于2GB的Dataframe。我却找不到另一种方法。意思是有一个PandasDataframe,我转换为Spark的帮助下pyarrow。问题是我真的找不到如何将PandasDataframe写入hdfs。
我的Pandas版本:0.19.0
4条答案
按热度按时间kuuvgm7e1#
从https://issues.apache.org/jira/browse/spark-6235
支持并行化大于2gb的r data.frame
已解决。
从https://pandas.pydata.org/pandas-docs/stable/r_interface.html
将Dataframe转换为r对象
您可以将Dataframe转换为r data.frame
所以也许转换Pandas->r->Spark->hdfs?
mnemlml82#
黑客可以从大Dataframe创建n个pandasDataframe(每个小于2GB)(水平分区),并创建n个不同的sparkDataframe,然后合并(合并)它们以创建最后一个Dataframe写入hdfs。我假设你的主机器是强大的,但你也有一个集群,你运行的Spark。
vatpfxk53#
另一种方法是将pandasDataframe转换为sparkDataframe(使用pyspark)并使用save命令将其保存到hdfs。例子
在这里
astype
更改列的类型object
至string
. 这将使您免于引发其他异常,因为spark无法找出异常类型object
. 但要确保这些列确实是string类型。现在要在hdfs中保存df:
jpfvwuh44#
意思是有一个PandasDataframe,我转换为Spark的帮助下pyarrow。
pyarrow.Table.fromPandas
您需要的功能是:结果可以直接写入parquet/hdfs,而无需通过spark传递数据:
另请参见
@wesmckinneyanswer使用pyarrow从hdfs读取Parquet文件。
在中读写apache parquet格式
pyarrow
文档。python中的本机hadoop文件系统(hdfs)连接
spark注解:
此外,由于spark 2.3(当前主控)箭头直接支持
createDataFrame
(spark-20791-使用apachearrow从pandas.dataframe改进spark createdataframe)。它使用SparkContext.defaultParallelism
计算块的数量,以便轻松控制单个批的大小。最后
defaultParallelism
可用于控制使用标准_convert_from_pandas
,有效地减小了切片的大小,使其更易于管理。不幸的是,这些不太可能解决您当前的内存问题。两者都取决于
parallelize
,因此将所有数据存储在驱动程序节点的内存中。切换到箭头或调整配置只能加快进程或地址块大小限制。在实践中,我不认为有任何理由在这里切换到Spark,只要你使用当地的Pandas
DataFrame
作为输入。在这种情况下,最严重的瓶颈是驱动程序的网络i/o,分发数据无法解决这个问题。