我们用jupyter pyspark
. 我们尝试进行logistic回归分析,未发现任何错误。当我们试着去适应自己的生活时,错误突然出现了 dataframe
回归模型。
为什么会出现这种错误?这是因为内存问题吗?我们使用的数据集的大小约为2GB,我们将虚拟机用于pyspark。错误消息粘贴在下面。
有人能告诉我为什么会这样,我需要做些什么吗?
Py4JJavaError Traceback (most recent call last)
<ipython-input-24-62d5302d4c2b> in <module>
----> 1 lrModel = lr.fit(df_dec)
/opt/spark/spark-2.3.2-bin-hadoop2.7/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/spark/spark-2.3.2-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit(self, dataset)
286
287 def _fit(self, dataset):
--> 288 java_model = self._fit_java(dataset)
289 model = self._create_model(java_model)
290 return self._copyValues(model)
/opt/spark/spark-2.3.2-bin-hadoop2.7/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
283 """
284 self._transfer_params_to_java()
--> 285 return self._java_obj.fit(dataset._jdf)
286
287 def _fit(self, dataset):
/opt/spark/spark-2.3.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/spark/spark-2.3.2-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a,**kw)
61 def deco(*a,**kw):
62 try:
---> 63 return f(*a,**kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/spark/spark-2.3.2-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o226.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 5, localhost, executor driver): scala.MatchError: [null,1.0,[1.0,3.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:496)
at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:496)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1651)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1639)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1638)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1638)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1872)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1821)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1810)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2131)
at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1098)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.fold(RDD.scala:1092)
at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1161)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1137)
at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:518)
at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:488)
at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:278)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:118)
at org.apache.spark.ml.Predictor.fit(Predictor.scala:82)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: scala.MatchError: [null,1.0,[1.0,3.0,1.0,1.0]] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:496)
at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:496)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1094)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1020)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1085)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:811)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
完整代码如下:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.classification import (RandomForestClassifier,GBTClassifier, DecisionTreeClassifier)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
spark = SparkSession.builder.appName("decision_tree_classifier").getOrCreate()
df = spark.read.csv("hdfs:/user/Final2.csv", header= True)
df.printSchema()
from pyspark.sql.types import IntegerType
df = df.withColumn("detail_age", df["detail_age"].cast(IntegerType()))
df = df.withColumn("activity_code", df["activity_code"].cast(IntegerType()))
df = df.withColumn("resident_status", df["resident_status"].cast(IntegerType()))
df = df.withColumn("age_recode_12", df["age_recode_12"].cast(IntegerType()))
df = df.withColumn("infant_age_recode_22", df["infant_age_recode_22"].cast(IntegerType()))
df = df.withColumn("place_of_death_and_decedents_status", df["place_of_death_and_decedents_status"].cast(IntegerType()))
df = df.withColumn("day_of_week_of_death", df["day_of_week_of_death"].cast(IntegerType()))
df = df.withColumn("manner_of_death", df["manner_of_death"].cast(IntegerType()))
df = df.withColumn("place_of_injury", df["place_of_injury"].cast(IntegerType()))
df = df.withColumn("race", df["race"].cast(IntegerType()))
df = df.withColumn("place_of_injury", df["place_of_injury"].cast(IntegerType()))
df = df.withColumn("race_recode_5", df["race_recode_5"].cast(IntegerType()))
df = df.withColumn("hispanic_originrace_recode", df["hispanic_originrace_recode"].cast(IntegerType()))
df = df.withColumn("education_2003_revision", df["education_2003_revision"].cast(IntegerType()))
df = df.withColumn("month_of_death", df["month_of_death"].cast(IntegerType()))
df.printSchema()
df.dtypes
df.take(5)
df = df.drop("_c0","X")
df.columns
df.printSchema()
from pyspark.sql.functions import col,expr,when
import numpy as np
import pandas as pd
df1 = df.withColumn("sex", when(df["sex"]=='M',1).otherwise(0))
df1.take(5)
df1.printSchema()
df_vec = VectorAssembler(inputCols=['race', 'resident_status', 'sex', 'month_of_death'], outputCol='features')
df_dec = df_vec.transform(df1)
train, test = df_dec.randomSplit([0.7,0.3])
lr = LogisticRegression(labelCol='manner_of_death')
lrModel = lr.fit(df_dec)
暂无答案!
目前还没有任何答案,快来回答吧!