是否可以允许用户在FastAPI或Flask中下载pyspark Dataframe 的结果

p1iqtdky  于 2023-02-15  发布在  Spark
关注(0)|答案(1)|浏览(193)

我正在使用FastAPI开发一个API,用户可以向它发出请求,以便实现以下功能:
1.首先,get请求将从GoogleCloudStorage获取一个文件,并将其加载到pysparkDataFrame中
1.然后,应用程序将对DataFrame执行一些转换
1.最后,我想将DataFrame作为parquet文件写入用户磁盘。
我不太清楚如何以 parquet 格式将文件交付给用户,原因如下:

  • df.write.parquet('out/path.parquet')将数据写入out/path.parquet的目录中,当我尝试将其传递给starlette.responses.FileResponse时,这会带来挑战
  • 将我知道存在的单个. parquet文件传递给starlette.responses.FileResponse似乎只是将二进制文件打印到我的控制台(如下面的代码所示)
  • 将DataFrame写入BytesIO流like in pandas看起来很有前途,但我不太清楚如何使用DataFrame的任何方法或DataFrame.rdd的方法来实现这一点。

这在FastAPI中可能吗?在Flask中使用send_file()可以吗?
这是我目前拥有的代码。注意,我已经尝试了一些方法,如注解代码,但没有效果。

import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse

router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/sample-data/my.parquet')

@router.get("/applications")
def applications():
    df.write.parquet("temp.parquet", compression="snappy")
    return FileResponse("part-some-compressed-file.snappy.parquet")
    # with tempfile.TemporaryFile() as f:
    #     f.write(df.rdd.saveAsPickleFile("temp.parquet"))
    #     return FileResponse("test.parquet")

谢谢!
编辑:我试着使用here提供的答案和信息,但我不能让它工作。

bvuwiixz

bvuwiixz1#

我能够解决这个问题,但它远远不够优雅。如果有人能提供一个不写入磁盘的解决方案,我将不胜感激,并将选择您的答案作为正确的。
我可以使用df.rdd.saveAsPickleFile()序列化DataFrame,压缩生成的目录,将其传递给Python客户端,将生成的zip文件写入磁盘,解压缩,然后在最终加载DataFrame之前使用SparkContext().pickleFile
API:

import shutil
import tempfile

from fastapi import APIRouter
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from starlette.responses import FileResponse

router = APIRouter()
sc = SparkContext('local')
spark = SparkSession(sc)

df: spark.createDataFrame = spark.read.parquet('gs://my-bucket/my-file.parquet')

@router.get("/applications")
def applications():
    temp_parquet = tempfile.NamedTemporaryFile()
    temp_parquet.close()
    df.rdd.saveAsPickleFile(temp_parquet.name)

    shutil.make_archive('test', 'zip', temp_parquet.name)

    return FileResponse('test.zip')

委托单位:

import io
import zipfile

import requests

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

response = requests.get("http://0.0.0.0:5000/applications")
file_like_object = io.BytesIO(response.content)
with zipfile.ZipFile(file_like_object) as z:
    z.extractall('temp.data')

rdd = sc.pickleFile("temp.data")
df = spark.createDataFrame(rdd)

print(df.head())

相关问题