pyspark流媒体与udf

k97glaaz  于 2021-05-26  发布在  Spark
关注(0)|答案(0)|浏览(408)

我是新的Spark流和Pandas自定义项。我正在kafka的pyspark consumer上工作,payload是xml格式的,并试图通过应用udf解析传入的xml

@pandas_udf("col1 string, col2 string",PandasUDFType.GROUPED_MAP)
def test_udf(df):
    import xmltodict
    from collections import MutableMapping 
    xml_str=df.iloc[0,0]
    df_col=['col1', 'col2']
    doc=xmltodict.parse(xml_str,dict_constructor=dict)
    extract_needed_fields = { k:doc[k] for k in df_col }
    return pd.DataFrame( [{'col1': 'abc', 'col2': 'def'}] , index=[0] , dtype="string" )

data=df.selectExpr("CAST(value AS STRING) AS value") 
data.groupby("value").apply(test_udf).writeStream.format("console").start()

我得到下面的错误

File "pyarrow/array.pxi", line 859, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 215, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 104, in pyarrow.lib._handle_arrow_array_protocol
ValueError: Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.

这是正确的方法吗?我做错什么了

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题