我正在尝试将.csv文件转换为.parquet文件。
csv文件(Temp.csv
)具有以下格式
1,Jon,Doe,Denver
我使用下面的python代码将其转换为 parquet
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import os
if __name__ == "__main__":
sc = SparkContext(appName="CSV2Parquet")
sqlContext = SQLContext(sc)
schema = StructType([
StructField("col1", IntegerType(), True),
StructField("col2", StringType(), True),
StructField("col3", StringType(), True),
StructField("col4", StringType(), True)])
dirname = os.path.dirname(os.path.abspath(__file__))
csvfilename = os.path.join(dirname,'Temp.csv')
rdd = sc.textFile(csvfilename).map(lambda line: line.split(","))
df = sqlContext.createDataFrame(rdd, schema)
parquetfilename = os.path.join(dirname,'output.parquet')
df.write.mode('overwrite').parquet(parquetfilename)
结果只有一个名为output.parquet
的文件夹,而不是我要查找的 parquet 文件,控制台上出现以下错误。
我也尝试过运行下面的代码来面对类似的问题。
from pyspark.sql import SparkSession
import os
spark = SparkSession \
.builder \
.appName("Protob Conversion to Parquet") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# read csv
dirname = os.path.dirname(os.path.abspath(__file__))
csvfilename = os.path.join(dirname,'Temp.csv')
df = spark.read.csv(csvfilename)
# Displays the content of the DataFrame to stdout
df.show()
parquetfilename = os.path.join(dirname,'output.parquet')
df.write.mode('overwrite').parquet(parquetfilename)
如何最好地做到这一点?使用窗口,python 2.7。
7条答案
按热度按时间7dl7o3gd1#
使用包
pyarrow
和pandas
,您可以将CSV转换为Parquet,而无需在后台使用JVM:运行
pyarrow
的一个限制是,pyarrow
只在Windows上的Python 3.5+中可用,要么使用Linux/OSX作为Python 2运行代码,要么将Windows安装升级到Python 3.6。oxiaedzo2#
你可以只使用pyarrow将csv转换成parquet格式--不使用panda。当你需要最小化代码依赖性时,它可能会很有用(例如,与AWS Lambda)。
请参阅pyarrow文档以微调
read_csv
和write_table
函数。9rbhqvlz3#
pn9klfpd4#
有几种不同的方法可以用Python将CSV文件转换为Parquet格式。
Uwe L. Korn的Pandas方法非常有效。
如果您想将多个CSV文件转换为多个Parquet /单个Parquet文件,请使用Dask。这会将多个CSV文件转换为两个Parquet文件:
如果你只想输出一个Parquet文件,你也可以使用
df.repartition(npartitions=1)
。更多关于使用Dask here将CSV转换为Parquet的信息。下面是一个在Spark环境中工作的PySpark代码片段:
您也可以在Spark环境中使用Koalas:
r8uurelv5#
处理大于内存的CSV文件
下面的代码将CSV转换为Parquet,而无需将整个CSV文件加载到内存中
以上代码:
1.读取大型CSV file in chunks。
1.通过添加新列转换数据框。
1.将df转换为箭头记录批。
1.将已转换的箭头批次作为新行组写入 parquet 文件。
注意:不要将块大小保持得太小。这将导致压缩效果不佳,因为块大小也与新 parquet 文件中的行组大小相对应。
eit6fx6z6#
您可以使用spark写入为PARQUET FILE:
我希望这能帮上忙
4szc88ey7#