我有一个pysparkDataframe,列值作为一个列表。我需要用基于另一个python列表的索引替换列表中的值
outer_list = [b,c,a,e,f,d]
+--------------------+--------------------+
| USER_ID| OFFERIDS|
+--------------------+--------------------+
| X + [a,b,c] |
+--------------------+--------------------+
| Y + [d,e,f] |
+--------------------+--------------------+
+--------------------+--------------------+--------------------+
| USER_ID| OFFERIDS| INDEXED|
+--------------------+--------------------+--------------------+
| X + [a,b,c] | [2,0,1] |
+--------------------+--------------------+--------------------+
| Y + [d,e,f] | [5,3,4] |
+--------------------+--------------------+--------------------+
在pyspark中创建自定义项不起作用
replace_index = udf(lambda x: [outer_list.index(offer) for offer in x if offer in outer_list ], ArrayType(IntegerType()) )
谁能帮我一下,我哪里出错了
userid_offerid_index = df.withColumn('INDEXED_OFFERIDS', replace_index('OFFERIDS'))
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 186, in wrapper
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 164, in __call__
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 148, in _judf
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 157, in _create_judf
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/sql/udf.py", line 33, in _wrap_function
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 2391, in _prepare_for_python_RDD
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 575, in dumps
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 918, in dumps
File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera4-1.cdh5.13.3.p0.611179/lib/spark2/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 249, in dump
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o69.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
暂无答案!
目前还没有任何答案,快来回答吧!