如何使用pyspark从gcp bucket并行处理多个文件

to94eoyn  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(414)

我有一个python程序,在不使用pyspark的情况下运行得非常好。但是,完成这个过程需要很长时间,所以我需要使用pyspark并行处理循环的文件。
现在,我在gcp bucket下处理文件的代码片段是:


# ... some other codes

def my_old_sequential_func():
    helper_object = MyHelperObject(...)
    for obj in gcp_bucket.objects(path):
        if obj.key.endswith('sys_data.txt'):
            #....some other codes
            file_data = (d for d in obj.download().decode('utf-8').split('\n'))
            parse_file(file_data, helper_object)
            #....some other codes ....

注意这里 obj.key 只是指我的gcp bucket上文件对象的名称,它只是一个字符串,而不是实际的内容。因此,这里的代码需要做的是使用 file_data = (d for d in obj.downlod()... 然后打电话 parse_file(file_data, helper_object) 按顺序。顺序性是我的程序花费很长时间的原因
我现在的尝试是:

def process_file(file):
    logging.info('enterred function')
    file_data = (d for d in file.download().decode('utf-8').split('\n'))
    parse_file(file_data, helper_object)
    # return 123

def my_parallel_attempt_func():
    objs= [obj.key for obj in gcp_bucket.objects(path) if obj.key.endswith('sys_data.txt')]
    files_rdd = spark.parallelize(objs)
    files_rdd = files_rdd.map(lambda f: process_file(f))

    result_rdd = files_rdd.collect()

请注意 objs 与之前遍历文件相同: for obj in gcp_bucket.objects(path) , objs 是由文件名组成的字符串列表: [my_gcp_bucket/_sys_data.txt, my_gcp_bucket/yyy_sys_data.txt, ...] 现在,在排队之后 files_rdd = spark.parallelize(objs) ,我的文件由15或20个文件名(object.key)组成,它们不是实际的文件内容。所以我试着调用map+lambda函数 files_rdd = files_rdd.map(lambda f: process_file(f)) 在我的每个文件上 files_rdd .
我的尝试失败了 process_file(f) 做它以前做过的事,但是,我不认为它做了。我没有看到日志信息“entered function”打印出来,也没有看到在我的 process_file 功能。如果我取消对行的注解 return 123 ,我可以看到我的 result_rdd 有[123,123,123,123,…],这意味着在我调用 collect() 但是,为什么每一次伐木都要排在队伍前面呢 return 123 没有被调用?我不确定是否每行之前 return 123 到底是不是被处决了?
我猜我没有使用 map(lambda f: process_file(f)) 功能正确吗?也许事实上 files_rdd 由字符串列表组成有问题吗?我假设这是lambdaing每个文件的名称\u我的文件的rdd和调用 process_file 在…上。
最重要的是,在我的内心 proces_file(file) 我也在调用函数 parse_file(file_data, helper_object) . 但是,helper\u object通过示例化是另一个类的示例化对象,因此pyspark无法序列化它,因此pyspark无法pickle该对象,因此我得到错误could not serialize object:typeerror:can't pickle\u thread.lock objects。没有使用pyspark,它以前可以工作,但是使用pyspark,我现在使用的很多对象显然不再是可序列化的(特别是我的from`parse_file(file_data,helper_object)”,程序的其余部分广泛地使用许多其他函数和对象)。有办法解决这个问题吗?
非常感谢你的帮助!!!

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题