使用IForest库实现一个检测离群值的函数,使用以下代码:
import pyspark.pandas as pd
import numpy as np
from alibi_detect.od import IForest
# ****************Modelo IForest******************************************
# IForest rta - Outlier ---> 1, Not-Outlier ----> 0
od = IForest(
threshold=0.,
n_estimators=5
)
def mode(lm):
freqs = groupby(Counter(lm).most_common(), lambda x:x[1])
m=[val for val,count in next(freqs)[1]]
if len(m)>1:
m=np.median(lm)
else:
m=float(m[0])
return m
def disper(x):
x_pred = x[['precio_local', 'precio_contenido']]
insumo_std = x_pred.std().to_frame().T
mod = mode(x_pred['precio_local'])
x_send2 = pd.DataFrame(
index=x_pred.index,
columns=['Std_precio','Std_prec_cont','cant_muestras','Moda_precio_local','IsFo']
)
x_send2.loc[:,'Std_precio'] = insumo_std.loc[0,'precio_local']
x_send2.loc[:,'Std_prec_cont'] = insumo_std.loc[0,'precio_local']
x_send2.loc[:,'Moda_precio_local'] = mod
mod_cont = mode(x_pred['precio_contenido'])
x_send2.loc[:,'Moda_precio_contenido_std'] = mod_cont
ctn = x_pred.shape[0]
x_send2.loc[:,'cant_muestras'] = ctn
if x_pred.shape[0]>3:
od.fit(x_pred)
preds = od.predict(
x_pred,
return_instance_score=True
)
x_preds = preds['data']['is_outlier']
#x_send2.loc[:,'IsFo']=x_preds
pd.set_option('compute.ops_on_diff_frames', True)
x_send2.loc[:,'IsFo']= pd.Series(x_preds, index=x_pred.index)
#x_send2.insert(x_pred.index, 'IsFo', x_preds)
else:
x_send2.loc[:,'IsFo'] = 0
print(type(x_send2))
print(x_send2)
return x_send2
insumo_all_pd = insumo_all.to_pandas_on_spark()
我得到的错误:
ArrowInvalid Traceback (most recent call last)
<command-1939548125702628> in <module>
----> 1 df_result = insumo_all_pd.groupby(by=['categoria','marca','submarca','barcode','contenido_std','unidad_std']).apply(disper)
2 display(df_result)
/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args,**kwargs)
192 start = time.perf_counter()
193 try:
--> 194 res = func(*args,**kwargs)
195 logger.log_success(
196 class_name, function_name, time.perf_counter() - start, signature
/databricks/spark/python/pyspark/pandas/groupby.py in apply(self, func, *args,**kwargs)
1200 else:
1201 pser_or_pdf = grouped.apply(pandas_apply, *args,**kwargs)
-> 1202 psser_or_psdf = ps.from_pandas(pser_or_pdf)
1203
1204 if len(pdf) <= limit:
/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args,**kwargs)
187 if hasattr(_local, "logging") and _local.logging:
188 # no need to log since this should be internal call.
--> 189 return func(*args,**kwargs)
190 _local.logging = True
191 try:
/databricks/spark/python/pyspark/pandas/namespace.py in from_pandas(pobj)
143 """
144 if isinstance(pobj, pd.Series):
--> 145 return Series(pobj)
146 elif isinstance(pobj, pd.DataFrame):
147 return DataFrame(pobj)
/databricks/spark/python/pyspark/pandas/usage_logging/__init__.py in wrapper(*args,**kwargs)
187 if hasattr(_local, "logging") and _local.logging:
188 # no need to log since this should be internal call.
--> 189 return func(*args,**kwargs)
190 _local.logging = True
191 try:
/databricks/spark/python/pyspark/pandas/series.py in __init__(self, data, index, dtype, name, copy, fastpath)
424 data=data, index=index, dtype=dtype, name=name, copy=copy, fastpath=fastpath
425 )
--> 426 internal = InternalFrame.from_pandas(pd.DataFrame(s))
427 if s.name is None:
428 internal = internal.copy(column_labels=[None])
/databricks/spark/python/pyspark/pandas/internal.py in from_pandas(pdf)
1458 data_columns,
1459 data_fields,
-> 1460 ) = InternalFrame.prepare_pandas_frame(pdf)
1461
1462 schema = StructType([field.struct_field for field in index_fields + data_fields])
/databricks/spark/python/pyspark/pandas/internal.py in prepare_pandas_frame(pdf, retain_index)
1531
1532 for col, dtype in zip(reset_index.columns, reset_index.dtypes):
-> 1533 spark_type = infer_pd_series_spark_type(reset_index[col], dtype)
1534 reset_index[col] = DataTypeOps(dtype, spark_type).prepare(reset_index[col])
1535
/databricks/spark/python/pyspark/pandas/typedef/typehints.py in infer_pd_series_spark_type(pser, dtype)
327 return pser.iloc[0].__UDT__
328 else:
--> 329 return from_arrow_type(pa.Array.from_pandas(pser).type)
330 elif isinstance(dtype, CategoricalDtype):
331 if isinstance(pser.dtype, CategoricalDtype):
/databricks/python/lib/python3.8/site-packages/pyarrow/array.pxi in pyarrow.lib.Array.from_pandas()
/databricks/python/lib/python3.8/site-packages/pyarrow/array.pxi in pyarrow.lib.array()
/databricks/python/lib/python3.8/site-packages/pyarrow/array.pxi in pyarrow.lib._ndarray_to_array()
/databricks/python/lib/python3.8/site-packages/pyarrow/error.pxi in pyarrow.lib.check_status()
ArrowInvalid: Could not convert Std_precio Std_prec_cont cant_muestras Moda_precio_local IsFo Moda_precio_contenido_std
107 0.0 0.0 3 1.0 0 1.666667
252 0.0 0.0 3 1.0 0 1.666667
396 0.0 0.0 3 1.0 0 1.666667 with type DataFrame: did not recognize Python value type when inferring an Arrow data type
使用时遇到的错误:
df_result = insumo_all_pd.groupby(by=['categoria','marca','submarca','barcode','contenido_std','unidad_std']).apply(disper)
Dataframe insumo_all_pd的模式为:
fecha_ola datetime64[ns]
pais object
categoria object
marca object
submarca object
contenido_std float64
unidad_std object
barcode object
precio_local float64
cantidad float64
descripcion object
id_ticket object
id_item object
id_pdv object
fecha_transaccion datetime64[ns]
id_ref float64
precio_contenido float64
dtype: object
我不清楚是什么原因导致了这个错误,但似乎是数据类型被错误地推断出来了。我试着将"disper"
函数产生的数据类型转换为float,但它给出了同样的错误。
我很感激你能给我的任何帮助或指导。
1条答案
按热度按时间oknwwptz1#
很明显,新的Jupyter已经改变了一些与Pandas相关的库。解决方案是升级到Jupyter 5。