pyspark pytest错误:sc=sparkcontext(conf=conf)

9udxz4iz  于 2021-07-12  发布在  Spark
关注(0)|答案(0)|浏览(220)

我正在运行pyspark测试,使用pytest作为wordcount示例。我有三个文件:
文件1: conftest.py :

import logging
import pytest
import pyspark
from pyspark import SparkConf, SparkContext

def quiet_py4j():
   logger = logging.getLogger('py4j')
   logger.setLevel(logging.WARN)

@pytest.fixture(scope="session")
def spark_context(request):
   conf = (SparkConf().setMaster("local[*]").setAppName("pytest-pyspark-testing"))
   request.addfinalizer(lambda: sc.stop())
   sc = SparkContext(conf=conf)
   quiet_py4j()
   return sc`

文件2: wordcount.py :

def do_word_counts(lines):
     counts = (lines.flatMap(lambda x: x.split())
              .map(lambda x: (x, 1))
              .reduceByKey(lambda x, y: x+y))

     results = {word: count for word, count in counts.collect()}
     return results

文件3: wordcount_test.py :

import pytest
 import wordcount

 pytestmark = pytest.mark.usefixtures("spark_context")

 def test_do_word_counts(spark_context):
      test_input = [' hello spark ',' hello again spark spark']

      input_rdd = spark_context.parallelize(test_input, 1)
      results = wordcount.do_word_counts(input_rdd)

      expected_results = {'hello':2, 'spark':3, 'again':1}  
      assert results == expected_results

运行命令后 pytest wordcount_test.py ,发生以下错误:

request = <SubRequest 'spark_context' for <Function test_do_word_counts>>

@pytest.fixture(scope="session")
def spark_context(request):
    conf = (SparkConf().setMaster("local[*]").setAppName("pytest-pyspark-testing"))
    #request.addfinalizer(lambda: sc.stop())

>   sc = SparkContext(conf=conf)

    conftest.py:15: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
   _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
    ../../anaconda3/lib/python3.8/site-packages/pyspark/context.py:146: in __init__
    self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,
    ../../anaconda3/lib/python3.8/site-packages/pyspark/context.py:226: in _do_init
    str(self._jvm.PythonUtils.getPythonAuthSocketTimeout(self._jsc))

有什么想法可以纠正吗?谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题