多线程Pypark,无法序列化对象异常

tyky79it  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(421)

_pickle.picklingerror:无法序列化对象:异常:似乎您正试图从广播变量、操作或转换引用sparkcontext。sparkcontext只能在驱动程序上使用,不能在工作程序上运行的代码中使用。有关更多信息,请参阅spark-5063。
超级简单的示例应用程序,尝试并行运行一些计算。工作(有时),但大多数时候崩溃与上述例外。
我认为我没有嵌套rdd,但是关于不能在workers中使用sparkcontext的部分令人担忧,因为我认为我需要它来实现某种程度的并行性。如果我不能在工作线程中使用sparkcontext,我如何得到计算结果?
在这一点上,我仍然希望它被序列化,并打算在这之后启用并行运行。但甚至无法运行序列化的多线程版本。。。。

from pyspark import SparkContext
import threading

THREADED = True. # Set this to false and it always works but is sequential

content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")
content = sc.textFile(content_file).cache() # For the non-threaded version

class Worker(threading.Thread):
    def __init__(self, letter, *args,**kwargs):
        super().__init__(*args,**kwargs)
        self.letter = letter

    def run(self):
        print(f"Starting: {self.letter}")
        nums[self.letter] = content.filter(lambda s: self.letter in s).count() # SPOILER self.letter turns out to be the problem
        print(f"{self.letter}: {nums[self.letter]}")

nums = {}
if THREADED:
    threads = []
    for char in range(ord('a'), ord('z')+1):
        letter = chr(char)
        threads.append(Worker(letter, name=letter))

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

else:
    for char in range(ord('a'), ord('z')+1):
        letter = chr(char)
        nums[letter] = content.filter(lambda s: letter in s).count()
        print(f"{letter}: {nums[letter]}")

print(nums)

即使我把代码改成一次只使用一个线程

threads = []
    for char in range(ord('a'), ord('z')+1):
        letter = chr(char)
        thread = Worker(letter, name=letter)
        threads.append(thread)
        thread.start()
        thread.join()

我猜它也会引发同样的异常,因为它试图在工作线程而不是主线程(在主线程中声明sparkcontext)中获取结果。
我需要能够同时等待几个值,如果spark要在这里提供任何好处。
我要解决的真正问题是:

__________RESULT_________
   ^         ^         ^
   A         B         C
a1 ^ a2   b1 ^ b2   c1 ^ c2...

为了得到我的结果,我想并行计算a,b和c,每个部分都要计算a1,a2,a3。。。。同时进行。我把它分成线程,这样我就可以同时请求多个值,这样spark就可以并行运行计算了。
我创建上面的示例仅仅是因为我想获得正确的线程,而不是试图找出如何计算包含字符的行数。但这似乎是超级简单的审查线程方面。
这个小小的变化就把事情解决了。self.letter在lambda中爆炸,在过滤器调用消除崩溃之前取消了它的引用

def run(self):
        print(f"Starting: {self.letter}")
        letter = self.letter
        nums[self.letter] = content.filter(lambda s: letter in s).count()
        print(f"{self.letter}: {nums[self.letter]}")
fkvaft9z

fkvaft9z1#

例外情况是
似乎您正试图从广播变量、操作或转换引用sparkcontext
在你的例子中 SparkContext 由以下行控制:

nums[self.letter] = self.content.filter(lambda s: self.letter in s).count()

在这一行中,使用以下lambda表达式定义一个过滤器(计为转换):

lambda s: self.letter in s

这个表达式的问题是:引用成员变量 letter 对象引用的 self . 要使此引用在批处理执行期间可用,spark需要序列化对象 self . 但这个对象不仅包含成员 letter ,但也 content ,这是一个spark rdd(每个spark rdd都有一个对创建它的sparkcontext的引用)。
要使lambda可序列化,必须确保不引用其中不可序列化的任何内容。举个例子,最简单的方法是基于成员定义一个局部变量 letter :

def run(self):
        print(f"Starting: {self.letter}")
        letter = self.letter
        nums[self.letter] = self.content.filter(lambda s: letter in s).count()
        print(f"{self.letter}: {nums[self.letter]}")

为什么

为了理解为什么我们不能这样做,我们必须理解spark在后台的每一次转换中都做了什么。
每当你有这样的代码时:

sc = SparkContext(<connection information>)

您正在创建与spark master的“连接”。它可以是一个简单的进程内本地spark master,也可以是运行在完全不同的服务器上的spark master。
鉴于 SparkContext -对象,我们可以定义管道从何处获取数据。对于本例,假设我们想从文本文件中读取数据(就像您的问题:

rdd = sc.textFile("file:///usr/local/Cellar/apache-spark/3.0.0/README.md")

正如我之前所提到的 SparkContext 或多或少是与spark master的“连接”。我们指定为文本文件位置的url必须可以从spark主机访问,而不是从您正在执行python脚本的系统访问!
基于我们创建的sparkrdd,我们现在可以定义应该如何处理数据。假设我们只计算包含给定字符串的行 "Hello World" :

linesThatContainHelloWorld = rdd.filter(lambda line: "Hello World" in line).count()

一旦我们调用一个终端函数(一个产生结果的计算,如 count() 在本例中)是它序列化我们传递给的函数 filter ,将序列化的数据传输到spark worker(可能在完全不同的服务器上运行),这些spark worker反序列化该函数以执行给定的函数。
这意味着这段代码: lambda line: "Hello World" in line 实际上不会在您当前所在的python进程中执行,而是在spark workers上执行。
每当我们在一个转换中引用来自上限的变量时(对于spark),事情就会变得更加棘手:

stringThatALineShouldContain = "Hello World"
linesThatContainHelloWorld = rdd.filter(lambda line: stringThatALineShouldContain in line).count()

现在,spark不仅要序列化给定的函数,还要序列化引用的变量 stringThatALineShouldContain 从上面的范围。在这个简单的例子中,这不是问题,因为变量 stringThatALineShouldContain 是可序列化的。
但是,每当我们试图访问不可序列化的内容,或者仅仅持有对不可序列化内容的引用时,spark就会抱怨。
例如:

stringThatALineShouldContain = "Hello World"
badExample = (sc, stringThatALineShouldContain) # tuple holding a reference to the SparkContext

linesThatContainHelloWorld = rdd.filter(lambda line: badExample[1] in line).count()

因为函数现在引用 badExample ,spark尝试序列化此变量并抱怨它包含对 SparkContext .
这不仅适用于 SparkContext ,但对于所有不可序列化的对象,例如到数据库的连接对象、文件句柄等等。如果出于任何原因,您必须这样做,那么您应该只引用包含如何创建该不可变量对象的信息的对象。

一个例子

无效示例

dbConnection = MySQLConnection("mysql.example.com") # Not sure if this class exists, only for the example
rdd.filter(lambda line: dbConnection.insertIfNotExists("INSERT INTO table (col) VALUES (?)", line)

有效示例


# note that this is still "bad code", since the connection is never cleared. But I hope you get the idea

class LazyMySQLConnection:
    connectionString = None
    actualConnection = None

    def __init__(self, connectionString):
        self.connectionString = connectionString

    def __getstate__(self):
        # tell pickle (the serialization library Spark uses for transformations) that the actualConnection member is not part of the state
        state = dict(self.__dict__)
        del state["actualConnection"]
        return state

    def getOrCreateConnection(self):
        if not self.actualConnection:
            self.actualConnection = MySQLConnection(self.connectionString)
        return self.actualConnection

lazyDbConnection = LazyMySQLConnection("mysql.example.com")
rdd.filter(lambda line: lazyDbConnection.getOrCreateConnection().insertIfNotExists("INSERT INTO table (col) VALUES (?)", line)

# remember, the lambda we supplied for the filter will be executed on the Spark-Workers, so the connection will be etablished from each Spark-Worker!
lqfhib0f

lqfhib0f2#

你试图使用(py)spark的方式是不打算使用的。您将普通的python数据处理与spark处理混为一谈,在spark上完全可以实现。
spark(和其他数据处理框架)的思想是,定义数据应该如何处理,所有多线程+分发的东西只是一个独立的“配置”。
另外,我也不太明白使用多线程会有什么好处。每个线程都会:
必须读取输入文件中的每个字符
检查当前行是否包含分配给此线程的字母
计数
当然,这会(如果有效的话)产生一个正确的结果,但是效率很低,因为会有很多线程在为那些文件上的读取操作而斗争(记住,每个线程都必须首先读取完整的文件,并且能够根据其分配的字母进行过滤)。
与spark一起工作,而不是对抗它,从中获得最大的收益。


# imports and so on

content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")

rdd = sc.textFile(content_file) # read from this file
rdd = rdd.flatMap(lambda line: [letter for letter in line]) # forward every letter of each line to the next operator

# initialize the letterRange "outside" of spark so we reduce the runtime-overhead

relevantLetterRange = [chr(char) for char in range(ord('a'), ord('z') + 1)]
rdd = rdd.filter(lambda letter: letter in relevantLetterRange)

rdd = rdd.keyBy(lambda letter: letter) # key by the letter itself
countsByKey = rdd.countByKey() # count by key

当然,您可以简单地将其写在一个链中:


# imports and so on

content_file = "file:///usr/local/Cellar/apache-spark/3.0.0/README.md"
sc = SparkContext("local", "first app")

relevantLetterRange = [chr(char) for char in range(ord('a'), ord('z') + 1)]

countsByKey = sc.textFile(content_file)\
    .flatMap(lambda line: [letter for letter in line])\
    .filter(lambda letter: letter in relevantLetterRange)\
    .keyBy(lambda letter: letter)
    .countByKey()

相关问题