Python(在databricks上)如何处理以字节形式返回的csv数据

of1yzvn4  于 12个月前  发布在  Python
关注(0)|答案(1)|浏览(80)

我有一些python代码,可以调用一个API。这个API返回的数据是“CSV”,但它不传递文件,而是返回“bytes”。例如,如果我这样做:

print(result)

它返回:

b'"column1"| "column2"/n"value1"|"value2"/n"anothervaluesecondrow"|"secondvaluesecondrow"/n'

我的目标是解析并将其写入databricks表。我可以把它解码成一个字符串

decoded=result.decode()

也许我的大脑今天没有发挥出100%的能力,但对于我的生活,我不知道如何阅读这一点。例如,如果我做这样的事情:

sparkdf=spark.read.option("header", True).option("inferSchema", True).option("sep", "|").csv(decoded)

我得到一个错误,如:

IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: <followed by the first lines of data>

因为它期望的是数据的路径,而不是数据本身。
我肯定我以前做过,但我的生活不记得我做错了什么。感谢任何帮助!

c2e8gylq

c2e8gylq1#

我想明白了首先,我创建了一个函数来将任何具有“NullType()”数据类型的列更改为字符串,然后我可以首先将数据导入为pandas df,转换为spark df,重新定义null列并写入表:

def removeVoidColumnsFromSparkDF(df):
    new_fields = []
    for field in df.schema:
        new_data_type = StringType() if str(field.dataType) == "NullType()" else field.dataType
        new_fields.append(StructField(field.name, new_data_type, field.nullable))
    new_schema = StructType(new_fields)
    return spark.createDataFrame(df.rdd, new_schema)

pandasdf = pd.read_csv(io.StringIO(result), sep='|',dtype='unicode')
sparkDF=spark.createDataFrame(pandasdf) 
table=f"{internaldb}.mytable"
new_df = removeVoidColumnsFromSparkDF(sparkDF)
new_df.write.mode("overwrite").saveAsTable(table)

相关问题