我正在使用FastAPI开发一个API,用户可以向它发出请求,以便实现以下功能:
1.首先,get请求将从GoogleCloudStorage获取一个文件,并将其加载到pysparkDataFrame中
1.然后,应用程序将对DataFrame执行一些转换
1.最后,我想将DataFrame作为parquet文件写入用户磁盘。
我不太清楚如何以 parquet 格式将文件交付给用户,原因如下:
df.write.parquet('out/path.parquet')
将数据写入out/path.parquet
的目录中,当我尝试将其传递给starlette.responses.FileResponse
时,这会带来挑战- 将我知道存在的单个. parquet文件传递给
starlette.responses.FileResponse
似乎只是将二进制文件打印到我的控制台(如下面的代码所示) - 将DataFrame写入BytesIO流like in pandas看起来很有前途,但我不太清楚如何使用DataFrame的任何方法或DataFrame.rdd的方法来实现这一点。
这在FastAPI中可能吗?在Flask中使用send_file()可以吗?
这是我目前拥有的代码。注意,我已经尝试了一些方法,如注解代码,但没有效果。
import tempfile
from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse
router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)
df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/sample-data/my.parquet')
@router.get("/applications")
def applications():
df.write.parquet("temp.parquet", compression="snappy")
return FileResponse("part-some-compressed-file.snappy.parquet")
# with tempfile.TemporaryFile() as f:
# f.write(df.rdd.saveAsPickleFile("temp.parquet"))
# return FileResponse("test.parquet")
谢谢!
编辑:我试着使用here提供的答案和信息,但我不能让它工作。
1条答案
按热度按时间bvuwiixz1#
我能够解决这个问题,但它远远不够优雅。如果有人能提供一个不写入磁盘的解决方案,我将不胜感激,并将选择您的答案作为正确的。
我可以使用
df.rdd.saveAsPickleFile()
序列化DataFrame,压缩生成的目录,将其传递给Python客户端,将生成的zip文件写入磁盘,解压缩,然后在最终加载DataFrame之前使用SparkContext().pickleFile
。API:
委托单位: