如何在jupyter笔记本中使用pyspark运行fpgrowth算法

fjaof16o  于 2021-05-29  发布在  Spark
关注(0)|答案(0)|浏览(653)

我想跑 FPGrowth 基于遗传算法的关联规则挖掘算法 pyspark python3(windows 10)中的包下面是我编写的代码:

from pyspark.sql.functions import col, size
import pyspark.sql.functions as func
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.ml.fpm import FPGrowth
from pyspark.sql import Row
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark import SparkConf

conf = SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]').set('spark.executor.memory', '100G')
        .set('spark.driver.memory', '400G')
        .set('spark.driver.maxResultSize', '200G'))

sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
R = Row('ID', 'items')

# Convering it to spark data frame.

df=spark.createDataFrame([R(i, x) for i, x in enumerate(data_final)])

# Fitting the FP Growth algorithm

fpGrowth = FPGrowth(itemsCol="items",minSupport=0.05, minConfidence=0.05)

model = fpGrowth.fit(df.select('items'))

# Filtering Rules based on items present in antecedent

ar=model.associationRules.where(func.size(func.col('antecedent')) == 2).where(func.size(func.col('consequent')) == 1)

我得到的错误是:

Exception                                 Traceback (most recent call last)
<ipython-input-40-78368ad0fa3a> in <module>
      5         .set('spark.driver.maxResultSize', '200G'))
      6 
----> 7 sc = SparkContext.getOrCreate(conf=conf)

c:\users\*\pyspark\context.py in getOrCreate(cls, conf)
    365         with SparkContext._lock:
    366             if SparkContext._active_spark_context is None:
--> 367                 SparkContext(conf=conf or SparkConf())
    368             return SparkContext._active_spark_context
    369 

c:\users\*pyspark\context.py in __init__(self, master, appName, sparkHome, pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
    131                     " note this option will be removed in Spark 3.0")
    132 
--> 133         SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
    134         try:
    135             self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize, serializer,

c:\users\*pyspark\context.py in _ensure_initialized(cls, instance, gateway, conf)
    314         with SparkContext._lock:
    315             if not SparkContext._gateway:
--> 316                 SparkContext._gateway = gateway or launch_gateway(conf)
    317                 SparkContext._jvm = SparkContext._gateway.jvm
    318 

c:\users\*pyspark\java_gateway.py in launch_gateway(conf)
     44     :return: a JVM gateway
     45     """
---> 46     return _launch_gateway(conf)
     47 
     48 

c:\users\*pyspark\java_gateway.py in _launch_gateway(conf, insecure)
    106 
    107             if not os.path.isfile(conn_info_file):
--> 108                 raise Exception("Java gateway process exited before sending its port number")
    109 
    110             with open(conn_info_file, "rb") as info:

Exception: Java gateway process exited before sending its port number

如何解决?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题