我试图对在databricks中生成的一些数据运行sql查询,但在运行引用我用于情绪分析的库的sql查询时遇到错误。
这是查询 %sql select sentiment, count(*) as Demo from tweets_parsed group by sentiment
这是使用python库之前的代码块
def get_sentiment(text):
from textblob import TextBlob
tweet = TextBlob(text)
if tweet.sentiment.polarity < 0:
sentiment = "negative"
elif tweet.sentiment.polarity == 0:
sentiment = "neutral"
else:
sentiment = "positive"
return sentiment
# Define your function
getSentiment = UserDefinedFunction(lambda x: get_sentiment(x), StringType())
# Apply the UDF using withColumn
tweets = tweets.withColumn('sentiment', getSentiment(col("tweet")))
这就是我在尝试运行sql查询时遇到的错误
Error in SQL statement: SparkException: Job aborted due to stage failure: Task 3 in stage 3761.0 failed 4 times, most recent failure: Lost task 3.3 in stage 3761.0 (TID 5402, 10.81.233.238, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/databricks/spark/python/pyspark/worker.py", line 480, in main
process()
File "/databricks/spark/python/pyspark/worker.py", line 472, in process
serializer.dump_stream(out_iter, outfile)
File "/databricks/spark/python/pyspark/serializers.py", line 460, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/databricks/spark/python/pyspark/serializers.py", line 150, in dump_stream
for obj in iterator:
File "/databricks/spark/python/pyspark/serializers.py", line 449, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/databricks/spark/python/pyspark/worker.py", line 87, in <lambda>
return lambda *a: f(*a)
File "/databricks/spark/python/pyspark/util.py", line 99, in wrapper
return f(*args,**kwargs)
File "/databricks/spark/python/pyspark/worker.py", line 79, in <lambda>
return lambda *a: g(f(*a))
File "<command-3730768793397314>", line 13, in <lambda>
File "<command-3730768793397314>", line 2, in get_sentiment
ModuleNotFoundError: No module named 'textblob'
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:540)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:494)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:640)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
at org.apache.spark.scheduler.Task.run(Task.scala:113)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$13.apply(Executor.scala:537)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:543)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
textblob库已经安装,当我运行这个查询时,引用该库时出现错误。
有人知道问题出在哪里吗?
暂无答案!
目前还没有任何答案,快来回答吧!