pyspark.ml-加载模型和管道时出错

hgncfbus  于 2021-05-22  发布在  Spark
关注(0)|答案(1)|浏览(636)

我想将经过训练的pyspark模型(或管道)导入pyspark脚本。我训练了一个决策树模型如下:

from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer

# Create assembler and labeller for spark.ml format preperation

assembler = VectorAssembler(inputCols = requiredFeatures, outputCol = 'features')
label_indexer = StringIndexer(inputCol='measurement_status', outputCol='indexed_label')

# Apply transformations

eq_df_labelled = label_indexer.fit(eq_df).transform(eq_df)
eq_df_labelled_featured = assembler.transform(eq_df_labelled)

# Split into training and testing datasets

(training_data, test_data) = eq_df_labelled_featured.randomSplit([0.75, 0.25])

# Create a decision tree algorithm

dtree = DecisionTreeClassifier(
    labelCol ='indexed_label',
    featuresCol = 'features',
    maxDepth = 5,
    minInstancesPerNode=1,
    impurity = 'gini',
    maxBins=32,
    seed=None
)

# Fit classifier object to training data

dtree_model = dtree.fit(training_data)

# Save model to given directory

dtree_model.save("models/dtree")

以上所有代码都没有任何错误。问题是,当我尝试加载此模型(在同一个或另一个pyspark应用程序上)时,使用:

from pyspark.ml.classification import DecisionTreeClassifier

imported_model = DecisionTreeClassifier()
imported_model.load("models/dtree")

我得到以下错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-4-b283bc2da75f> in <module>
      2 
      3 imported_model = DecisionTreeClassifier()
----> 4 imported_model.load("models/dtree")
      5 
      6 #lodel = DecisionTreeClassifier.load("models/dtree-test/")

~/.local/lib/python3.6/site-packages/pyspark/ml/util.py in load(cls, path)
    328     def load(cls, path):
    329         """Reads an ML instance from the input path, a shortcut of `read().load(path)`."""
--> 330         return cls.read().load(path)
    331 
    332 

~/.local/lib/python3.6/site-packages/pyspark/ml/util.py in load(self, path)
    278         if not isinstance(path, basestring):
    279             raise TypeError("path should be a basestring, got type %s" % type(path))
--> 280         java_obj = self._jread.load(path)
    281         if not hasattr(self._clazz, "_from_java"):
    282             raise NotImplementedError("This Java ML type cannot be loaded into Python currently: %r"

~/.local/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/.local/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a,**kw)
    126     def deco(*a,**kw):
    127         try:
--> 128             return f(*a,**kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

~/.local/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o39.load.
: java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1439)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1437)
    at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:587)
    at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:465)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

我选择这种方法是因为使用 Pipeline 对象。你知道发生了什么事吗?
更新
我已经意识到这个错误只有在我使用我的spark集群时才会发生(一个主机,两个工人使用spark的独立集群管理器)。如果我这样设置spark会话(其中主进程设置为本地进程):

spark = SparkSession\
    .builder\
    .config(conf=conf)\
    .appName("MachineLearningTesting")\
    .master("local[*]")\
    .getOrCreate()

我不明白上面的错误。
另外,我正在使用spark3.0.0,是不是spark3中的模型导入和导出仍然有bug?

xmd2e60i

xmd2e60i1#

有两个问题:
必须在群集中的所有节点之间启用ssh身份验证通信。尽管spark集群中的所有节点都在同一个网络中,但只有主节点对工作节点进行了ssh认证,反之亦然。
模型必须对群集中的所有节点可用。这听起来可能非常明显,但我认为模型文件只需要对主节点可用,然后主节点将其扩散到工作节点。换句话说,当您这样加载模型时:

from pyspark.ml.classification import DecisionTreeClassifier

imported_model = DecisionTreeClassifier()
imported_model.load("models/dtree")

文件 /absoloute_path/models/dtree 必须存在于群集中的每台计算机上。这让我明白,在生产环境中,模型可能是通过外部共享文件系统访问的。
这两个步骤解决了我将pyspark模型加载到集群上运行的spark应用程序中的问题。

相关问题