pyspark 箭头无效:无法转换类型为DataFrame的...:在推断Arrow数据类型时无法识别Python值类型

eiee3dmh  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(151)

使用IForest库实现一个检测离群值的函数,使用以下代码:

import pyspark.pandas as pd
import numpy as np
from alibi_detect.od import IForest

# ****************Modelo IForest******************************************

# IForest rta - Outlier ---> 1, Not-Outlier ----> 0

od = IForest(
    threshold=0.,
    n_estimators=5
)

def mode(lm):
    freqs = groupby(Counter(lm).most_common(), lambda x:x[1])
    m=[val for val,count in next(freqs)[1]]
    if len(m)>1:
        m=np.median(lm)
    else:
        m=float(m[0])
    return m

def disper(x):
    x_pred = x[['precio_local', 'precio_contenido']]
    insumo_std = x_pred.std().to_frame().T
    mod = mode(x_pred['precio_local'])
    x_send2 = pd.DataFrame( 
        index=x_pred.index,
        columns=['Std_precio','Std_prec_cont','cant_muestras','Moda_precio_local','IsFo']
    )
    x_send2.loc[:,'Std_precio'] = insumo_std.loc[0,'precio_local']
    x_send2.loc[:,'Std_prec_cont'] = insumo_std.loc[0,'precio_local']
    x_send2.loc[:,'Moda_precio_local'] = mod
    mod_cont = mode(x_pred['precio_contenido'])
    x_send2.loc[:,'Moda_precio_contenido_std'] = mod_cont
    ctn = x_pred.shape[0]
    x_send2.loc[:,'cant_muestras'] = ctn    
    if x_pred.shape[0]>3:
      od.fit(x_pred)
      preds = od.predict(
          x_pred,
          return_instance_score=True
      )

      x_preds = preds['data']['is_outlier']
      #x_send2.loc[:,'IsFo']=x_preds
      pd.set_option('compute.ops_on_diff_frames', True)
      x_send2.loc[:,'IsFo']= pd.Series(x_preds, index=x_pred.index)
      #x_send2.insert(x_pred.index, 'IsFo', x_preds)
    else:
        x_send2.loc[:,'IsFo'] = 0
    print(type(x_send2))
    print(x_send2)
    return x_send2

insumo_all_pd = insumo_all.to_pandas_on_spark()

我得到的错误:

ArrowInvalid                              Traceback (most recent call last)
<command-1939548125702628> in <module>
----> 1 df_result = insumo_all_pd.groupby(by=['categoria','marca','submarca','barcode','contenido_std','unidad_std']).apply(disper)
      2 display(df_result)

/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args,**kwargs)
    192             start = time.perf_counter()
    193             try:
--> 194                 res = func(*args,**kwargs)
    195                 logger.log_success(
    196                     class_name, function_name, time.perf_counter() - start, signature

/databricks/spark/python/pyspark/pandas/groupby.py in apply(self, func, *args,**kwargs)
   1200             else:
   1201                 pser_or_pdf = grouped.apply(pandas_apply, *args,**kwargs)
-> 1202             psser_or_psdf = ps.from_pandas(pser_or_pdf)
   1203 
   1204             if len(pdf) <= limit:

/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args,**kwargs)
    187         if hasattr(_local, "logging") and _local.logging:
    188             # no need to log since this should be internal call.
--> 189             return func(*args,**kwargs)
    190         _local.logging = True
    191         try:

/databricks/spark/python/pyspark/pandas/namespace.py in from_pandas(pobj)
    143     """
    144     if isinstance(pobj, pd.Series):
--> 145         return Series(pobj)
    146     elif isinstance(pobj, pd.DataFrame):
    147         return DataFrame(pobj)

/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args,**kwargs)
    187         if hasattr(_local, "logging") and _local.logging:
    188             # no need to log since this should be internal call.
--> 189             return func(*args,**kwargs)
    190         _local.logging = True
    191         try:

/databricks/spark/python/pyspark/pandas/series.py in __init__(self, data, index, dtype, name, copy, fastpath)
    424                     data=data, index=index, dtype=dtype, name=name, copy=copy, fastpath=fastpath
    425                 )
--> 426             internal = InternalFrame.from_pandas(pd.DataFrame(s))
    427             if s.name is None:
    428                 internal = internal.copy(column_labels=[None])

/databricks/spark/python/pyspark/pandas/internal.py in from_pandas(pdf)
   1458             data_columns,
   1459             data_fields,
-> 1460         ) = InternalFrame.prepare_pandas_frame(pdf)
   1461 
   1462         schema = StructType([field.struct_field for field in index_fields + data_fields])

/databricks/spark/python/pyspark/pandas/internal.py in prepare_pandas_frame(pdf, retain_index)
   1531 
   1532         for col, dtype in zip(reset_index.columns, reset_index.dtypes):
-> 1533             spark_type = infer_pd_series_spark_type(reset_index[col], dtype)
   1534             reset_index[col] = DataTypeOps(dtype, spark_type).prepare(reset_index[col])
   1535 

/databricks/spark/python/pyspark/pandas/typedef/typehints.py in infer_pd_series_spark_type(pser, dtype)
    327             return pser.iloc[0].__UDT__
    328         else:
--> 329             return from_arrow_type(pa.Array.from_pandas(pser).type)
    330     elif isinstance(dtype, CategoricalDtype):
    331         if isinstance(pser.dtype, CategoricalDtype):

/databricks/python/lib/python3.8/site-packages/pyarrow/array.pxi in pyarrow.lib.Array.from_pandas()

/databricks/python/lib/python3.8/site-packages/pyarrow/array.pxi in pyarrow.lib.array()

/databricks/python/lib/python3.8/site-packages/pyarrow/array.pxi in pyarrow.lib._ndarray_to_array()

/databricks/python/lib/python3.8/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()

ArrowInvalid: Could not convert      Std_precio  Std_prec_cont  cant_muestras  Moda_precio_local  IsFo  Moda_precio_contenido_std
107         0.0            0.0              3                1.0     0                   1.666667
252         0.0            0.0              3                1.0     0                   1.666667
396         0.0            0.0              3                1.0     0                   1.666667 with type DataFrame: did not recognize Python value type when inferring an Arrow data type

使用时遇到的错误:

df_result = insumo_all_pd.groupby(by=['categoria','marca','submarca','barcode','contenido_std','unidad_std']).apply(disper)

Dataframe insumo_all_pd的模式为:

fecha_ola            datetime64[ns]
pais                         object
categoria                    object
marca                        object
submarca                     object
contenido_std               float64
unidad_std                   object
barcode                      object
precio_local                float64
cantidad                    float64
descripcion                  object
id_ticket                    object
id_item                      object
id_pdv                       object
fecha_transaccion    datetime64[ns]
id_ref                      float64
precio_contenido            float64
dtype: object

我不清楚是什么原因导致了这个错误,但似乎是数据类型被错误地推断出来了。我试着将"disper"函数产生的数据类型转换为float,但它给出了同样的错误。
我很感激你能给我的任何帮助或指导。

oknwwptz

oknwwptz1#

很明显,新的Jupyter已经改变了一些与Pandas相关的库。解决方案是升级到Jupyter 5。

相关问题