我想在pyspark Dataframe 中创建测试数据,但总是得到相同的“元组索引超出范围”错误。我在阅读csv时没有得到此错误。希望了解为什么我会得到此错误。
我尝试的第一件事是创建一个Pandas Dataframe ,并将其转换为pyspark Dataframe :
columns = ["id","col_"]
data = [("1", "blue"), ("2", "green"),
("3", "purple"), ("4", "red"),
("5", "yellow")]
df = pd.DataFrame(data=data, columns=columns)
sparkdf = spark.createDataFrame(df)
sparkdf.show()
输出:
PicklingError: Could not serialize object: IndexError: tuple index out of range
如果我尝试根据SparkbyExamples.com指令从RDD创建 Dataframe ,则会出现相同的错误:
rdd = spark.sparkContext.parallelize(data)
sparkdf = spark.createDataFrame(rdd).toDF(*columns)
sparkdf.show()
我还尝试了以下方法,但得到了相同的错误:
import pyspark.pandas as ps
df1 = ps.from_pandas(df)
下面是运行上述代码时的完整错误:
IndexError Traceback (most recent call last)
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\serializers.py:458, in CloudPickleSerializer.dumps(self, obj)
457 try:
--> 458 return cloudpickle.dumps(obj, pickle_protocol)
459 except pickle.PickleError:
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:73, in dumps(obj, protocol, buffer_callback)
70 cp = CloudPickler(
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:602, in CloudPickler.dump(self, obj)
601 try:
--> 602 return Pickler.dump(self, obj)
603 except RuntimeError as e:
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:692, in CloudPickler.reducer_override(self, obj)
691 elif isinstance(obj, types.FunctionType):
--> 692 return self._function_reduce(obj)
693 else:
694 # fallback to save_global, including the Pickler's
695 # dispatch_table
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:565, in CloudPickler._function_reduce(self, obj)
564 else:
--> 565 return self._dynamic_function_reduce(obj)
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:546, in CloudPickler._dynamic_function_reduce(self, func)
545 newargs = self._function_getnewargs(func)
--> 546 state = _function_getstate(func)
547 return (types.FunctionType, newargs, state, None, None,
548 _function_setstate)
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle_fast.py:157, in _function_getstate(func)
146 slotstate = {
147 "__name__": func.__name__,
148 "__qualname__": func.__qualname__,
(...)
154 "__closure__": func.__closure__,
155 }
--> 157 f_globals_ref = _extract_code_globals(func.__code__)
158 f_globals = {k: func.__globals__[k] for k in f_globals_ref if k in
159 func.__globals__}
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle.py:334, in _extract_code_globals(co)
331 # We use a dict with None values instead of a set to get a
332 # deterministic order (assuming Python 3.6+) and avoid introducing
333 # non-deterministic pickle bytes as a results.
--> 334 out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
336 # Declaring a function inside another one using the "def ..."
337 # syntax generates a constant code object corresponding to the one
338 # of the nested function's As the nested function may itself need
339 # global variables, we need to introspect its code, extract its
340 # globals, (look for code object in it's co_consts attribute..) and
341 # add the result to code_globals
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\cloudpickle\cloudpickle.py:334, in <dictcomp>(.0)
331 # We use a dict with None values instead of a set to get a
332 # deterministic order (assuming Python 3.6+) and avoid introducing
333 # non-deterministic pickle bytes as a results.
--> 334 out_names = {names[oparg]: None for _, oparg in _walk_global_ops(co)}
336 # Declaring a function inside another one using the "def ..."
337 # syntax generates a constant code object corresponding to the one
338 # of the nested function's As the nested function may itself need
339 # global variables, we need to introspect its code, extract its
340 # globals, (look for code object in it's co_consts attribute..) and
341 # add the result to code_globals
IndexError: tuple index out of range
During handling of the above exception, another exception occurred:
PicklingError Traceback (most recent call last)
Cell In [67], line 2
1 rdd = spark.sparkContext.parallelize(data)
----> 2 df1 = ps.from_pandas(df)
3 sparkdf = spark.createDataFrame(rdd).toDF(*columns)
4 #Create a dictionary from each row in col_
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\pandas\namespace.py:153, in from_pandas(pobj)
151 return Series(pobj)
152 elif isinstance(pobj, pd.DataFrame):
--> 153 return DataFrame(pobj)
154 elif isinstance(pobj, pd.Index):
155 return DataFrame(pd.DataFrame(index=pobj)).index
File c:\Users\jonat\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\pandas\frame.py:450, in DataFrame.__init__(self, data, index, columns, dtype, copy)
448 else:
449 pdf = pd.DataFrame(data=data, index=index, columns=columns, dtype=dtype, copy=copy)
--> 450 internal = InternalFrame.from_pandas(pdf)
452 object.__setattr__(self, "_internal_frame", internal)
...
466 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
467 print_exec(sys.stderr)
--> 468 raise pickle.PicklingError(msg)
PicklingError: Could not serialize object: IndexError: tuple index out of range
1条答案
按热度按时间mxg2im7a1#
在做了一些阅读之后,我检查了https://pyreadiness.org/3.11,看起来pyspark不支持最新版本的python。