我有一个python程序,在不使用pyspark的情况下运行得非常好。但是,完成这个过程需要很长时间,所以我需要使用pyspark并行处理循环的文件。
现在,我在gcp bucket下处理文件的代码片段是:
# ... some other codes
def my_old_sequential_func():
helper_object = MyHelperObject(...)
for obj in gcp_bucket.objects(path):
if obj.key.endswith('sys_data.txt'):
#....some other codes
file_data = (d for d in obj.download().decode('utf-8').split('\n'))
parse_file(file_data, helper_object)
#....some other codes ....
注意这里 obj.key
只是指我的gcp bucket上文件对象的名称,它只是一个字符串,而不是实际的内容。因此,这里的代码需要做的是使用 file_data = (d for d in obj.downlod()...
然后打电话 parse_file(file_data, helper_object)
按顺序。顺序性是我的程序花费很长时间的原因
我现在的尝试是:
def process_file(file):
logging.info('enterred function')
file_data = (d for d in file.download().decode('utf-8').split('\n'))
parse_file(file_data, helper_object)
# return 123
def my_parallel_attempt_func():
objs= [obj.key for obj in gcp_bucket.objects(path) if obj.key.endswith('sys_data.txt')]
files_rdd = spark.parallelize(objs)
files_rdd = files_rdd.map(lambda f: process_file(f))
result_rdd = files_rdd.collect()
请注意 objs
与之前遍历文件相同: for obj in gcp_bucket.objects(path)
, objs
是由文件名组成的字符串列表: [my_gcp_bucket/_sys_data.txt, my_gcp_bucket/yyy_sys_data.txt, ...]
现在,在排队之后 files_rdd = spark.parallelize(objs)
,我的文件由15或20个文件名(object.key)组成,它们不是实际的文件内容。所以我试着调用map+lambda函数 files_rdd = files_rdd.map(lambda f: process_file(f))
在我的每个文件上 files_rdd
.
我的尝试失败了 process_file(f)
做它以前做过的事,但是,我不认为它做了。我没有看到日志信息“entered function”打印出来,也没有看到在我的 process_file
功能。如果我取消对行的注解 return 123
,我可以看到我的 result_rdd
有[123,123,123,123,…],这意味着在我调用 collect()
但是,为什么每一次伐木都要排在队伍前面呢 return 123
没有被调用?我不确定是否每行之前 return 123
到底是不是被处决了?
我猜我没有使用 map(lambda f: process_file(f))
功能正确吗?也许事实上 files_rdd
由字符串列表组成有问题吗?我假设这是lambdaing每个文件的名称\u我的文件的rdd和调用 process_file
在…上。
最重要的是,在我的内心 proces_file(file)
我也在调用函数 parse_file(file_data, helper_object)
. 但是,helper\u object通过示例化是另一个类的示例化对象,因此pyspark无法序列化它,因此pyspark无法pickle该对象,因此我得到错误could not serialize object:typeerror:can't pickle\u thread.lock objects。没有使用pyspark,它以前可以工作,但是使用pyspark,我现在使用的很多对象显然不再是可序列化的(特别是我的from`parse_file(file_data,helper_object)”,程序的其余部分广泛地使用许多其他函数和对象)。有办法解决这个问题吗?
非常感谢你的帮助!!!
暂无答案!
目前还没有任何答案,快来回答吧!