pyspark.ml-ngrams+countvectorizer-基于计数权重排序

juzqafwq  于 2021-07-24  发布在  Hive
关注(0)|答案(1)|浏览(503)

我想找到ngram countvectorizer并根据计数权重对输出进行排序。我采用了以下方法:
步骤a。在spark层找到ngram countvectorizer。
第二步。将pyspark mlDataframe保存到配置单元表。
面临的问题:在步骤b中获取java堆空间。
查询:
答。在执行步骤b之前,不要在配置单元表中写入下面的所有列;只有很少的专栏会说(1个链接,1个计数),这是个好主意吗(2个链接,2个计数)和(3个链接,3个计数)在单独的表中。这样,写入配置单元表的数据将减少。pyspark.sql.dataframe.dataframe具有以下列:id、urlinks、1\u links、2\u links、3\u links,
1个数,2个数,3个数
b。列1\u计数、2\u计数和3\u计数;这些可以从稀疏向量转换为稠密向量;并且只取出那些计数超过阈值(比如3)的索引。这样可以减少获取已写入配置单元表的数据,并减少发生java堆空间错误的机会。
如果这些方法是好的,请让我们知道如何执行。我无法做到这一点(提供将不会得到java堆空间的错误)。
c。如何用counts在同一个hive表中获取词汇。
d。如何处理java堆空间错误。我需要研究哪些参数。在启动笔记本时使用了以下设置。

PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 -- 
executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver- 
maxResultSize 100g --executor-cores 10 --conf 
spark.yarn.executor.memoryOverhead=100g'

也请告知,是否有其他方法可以采取。
注:1。数据量非常大(每小时点击流行数达到百万),无法将数据拉到本地磁盘并在scikitlearn中执行countvectorizer。
写了以下代码:
使用以下设置启动jupyter笔记本:

`PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook --port=XXXX --no-browser --ip= --NotebookApp.token= " PYSPARK_PYTHON="/opt/anaconda/anaconda4/bin/python" PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver-maxResultSize 100g --executor-cores 10 --conf spark.yarn.executor.memoryOverhead=100g' PYSPARK_DRIVER_PYTHON="jupyter" pyspark`

代码如下:


# Import

from pyspark.sql import SQLContext ,HiveContext ,Row
from pyspark.sql import functions as F
import pandas as pd
from time import time
import math

from pyspark.sql.types import ArrayType, StringType, MapType
from pyspark.sql.functions import udf

from pyspark.sql.types import ArrayType, StringType, MapType, IntegerType
from pyspark.sql.functions import udf
from collections import Counter

from pyspark.ml.feature import NGram

from pyspark.ml.feature import NGram, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline

from pyspark.mllib.linalg import SparseVector, DenseVector

# Setting Spark Context

sqlContext = HiveContext(sc)

# Creating the pyspark ml dataframe

df = sqlContext.sql("SELECT id, collect_set(urllink) as urllinks FROM clik_stream \
where click_year='2018' and click_month='02' and click_day='02' GROUP BY id")

def build_ngrams_part(inputCol="urllinks", n=3):

   ngrams = [
      NGram(n=i, inputCol="urllinks", outputCol="{0}_links".format(i))
      for i in range(1, n + 1)
   ]

   vectorizers = [
    CountVectorizer(inputCol="{0}_links".format(i),
        outputCol="{0}_counts".format(i))
    for i in range(1, n + 1)
   ]

# assembler = [VectorAssembler(

# inputCols=["{0}_counts".format(i) for i in range(1, n + 1)],

# outputCol="features"

# )]

# return Pipeline(stages=ngrams +

DenseVector(SparseVector(vectorizers).toArray()))
   return Pipeline(stages=ngrams + vectorizers)

a = build_ngrams_part().fit(df)
b = a.transform(df)

b.write.saveAsTable("output")

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-6-a6183cfa83e6> in <module>()
----> 1 b.write.saveAsTable("output")

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy,**options)
    631         if format is not None:
    632             self.format(format)
--> 633         self._jwrite.saveAsTable(name)
    634 
    635     @since(1.4)

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/utils.py in deco(*a,**kw)
     61     def deco(*a,**kw):
     62         try:
---> 63             return f(*a,**kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o191.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
    at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:454)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:198)
    at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:158)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
    at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
    at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
    at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2289)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:180)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)

​

我在示例数据集上运行了代码(以检查逻辑)。它按预期工作。

df_test = spark.createDataFrame([
  (1, ["a", "b", "c", "d"]),
  (2, ["d", "e", "d"]),
  (3, ["e", "f", "e"]),    
], ("id", "urllinks"))

a = build_ngrams_part().fit(df_test)
b = a.transform(df_test)
b.show(3)

stages = a.stages
from pyspark.ml.feature import CountVectorizerModel

vectorizers = [s for s in stages if isinstance(s, CountVectorizerModel)]
[v.vocabulary for v in vectorizers]

    +---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
| id|    urllinks|     1_links|        2_links|       3_links|            1_counts|            2_counts|           3_counts|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
|  1|[a, b, c, d]|[a, b, c, d]|[a b, b c, c d]|[a b c, b c d]|(6,[1,2,3,4],[1.0...|(7,[0,2,3],[1.0,1...|(4,[0,2],[1.0,1.0])|
|  2|   [d, e, d]|   [d, e, d]|     [d e, e d]|       [d e d]| (6,[0,1],[1.0,2.0])| (7,[1,4],[1.0,1.0])|      (4,[3],[1.0])|
|  3|   [e, f, e]|   [e, f, e]|     [e f, f e]|       [e f e]| (6,[0,5],[2.0,1.0])| (7,[5,6],[1.0,1.0])|      (4,[1],[1.0])|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+

b.write.saveAsTable("sample.output")

请帮助解答查询部分的问题。

ws51t4hk

ws51t4hk1#

我已经减少了传送到配置单元表的数据量。错误已更正。。

相关问题