我想找到ngram countvectorizer并根据计数权重对输出进行排序。我采用了以下方法:
步骤a。在spark层找到ngram countvectorizer。
第二步。将pyspark mlDataframe保存到配置单元表。
面临的问题:在步骤b中获取java堆空间。
查询:
答。在执行步骤b之前,不要在配置单元表中写入下面的所有列;只有很少的专栏会说(1个链接,1个计数),这是个好主意吗(2个链接,2个计数)和(3个链接,3个计数)在单独的表中。这样,写入配置单元表的数据将减少。pyspark.sql.dataframe.dataframe具有以下列:id、urlinks、1\u links、2\u links、3\u links,
1个数,2个数,3个数
b。列1\u计数、2\u计数和3\u计数;这些可以从稀疏向量转换为稠密向量;并且只取出那些计数超过阈值(比如3)的索引。这样可以减少获取已写入配置单元表的数据,并减少发生java堆空间错误的机会。
如果这些方法是好的,请让我们知道如何执行。我无法做到这一点(提供将不会得到java堆空间的错误)。
c。如何用counts在同一个hive表中获取词汇。
d。如何处理java堆空间错误。我需要研究哪些参数。在启动笔记本时使用了以下设置。
PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --
executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver-
maxResultSize 100g --executor-cores 10 --conf
spark.yarn.executor.memoryOverhead=100g'
也请告知,是否有其他方法可以采取。
注:1。数据量非常大(每小时点击流行数达到百万),无法将数据拉到本地磁盘并在scikitlearn中执行countvectorizer。
写了以下代码:
使用以下设置启动jupyter笔记本:
`PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook --port=XXXX --no-browser --ip= --NotebookApp.token= " PYSPARK_PYTHON="/opt/anaconda/anaconda4/bin/python" PYSPARK_SUBMIT_ARGS='--master yarn --deploy-mode client --num-executors 24 --executor-memory 50g --driver-memory 200g --default-parallelism 2500 --driver-maxResultSize 100g --executor-cores 10 --conf spark.yarn.executor.memoryOverhead=100g' PYSPARK_DRIVER_PYTHON="jupyter" pyspark`
代码如下:
# Import
from pyspark.sql import SQLContext ,HiveContext ,Row
from pyspark.sql import functions as F
import pandas as pd
from time import time
import math
from pyspark.sql.types import ArrayType, StringType, MapType
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, MapType, IntegerType
from pyspark.sql.functions import udf
from collections import Counter
from pyspark.ml.feature import NGram
from pyspark.ml.feature import NGram, CountVectorizer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.mllib.linalg import SparseVector, DenseVector
# Setting Spark Context
sqlContext = HiveContext(sc)
# Creating the pyspark ml dataframe
df = sqlContext.sql("SELECT id, collect_set(urllink) as urllinks FROM clik_stream \
where click_year='2018' and click_month='02' and click_day='02' GROUP BY id")
def build_ngrams_part(inputCol="urllinks", n=3):
ngrams = [
NGram(n=i, inputCol="urllinks", outputCol="{0}_links".format(i))
for i in range(1, n + 1)
]
vectorizers = [
CountVectorizer(inputCol="{0}_links".format(i),
outputCol="{0}_counts".format(i))
for i in range(1, n + 1)
]
# assembler = [VectorAssembler(
# inputCols=["{0}_counts".format(i) for i in range(1, n + 1)],
# outputCol="features"
# )]
# return Pipeline(stages=ngrams +
DenseVector(SparseVector(vectorizers).toArray()))
return Pipeline(stages=ngrams + vectorizers)
a = build_ngrams_part().fit(df)
b = a.transform(df)
b.write.saveAsTable("output")
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-6-a6183cfa83e6> in <module>()
----> 1 b.write.saveAsTable("output")
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/readwriter.py in saveAsTable(self, name, format, mode, partitionBy,**options)
631 if format is not None:
632 self.format(format)
--> 633 self._jwrite.saveAsTable(name)
634
635 @since(1.4)
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/pyspark/sql/utils.py in deco(*a,**kw)
61 def deco(*a,**kw):
62 try:
---> 63 return f(*a,**kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o191.saveAsTable.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:215)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:173)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.DataSource.writeInFileFormat(DataSource.scala:438)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:454)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:198)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:158)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:420)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:354)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3236)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:118)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2289)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:841)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:840)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:840)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:389)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:180)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:173)
我在示例数据集上运行了代码(以检查逻辑)。它按预期工作。
df_test = spark.createDataFrame([
(1, ["a", "b", "c", "d"]),
(2, ["d", "e", "d"]),
(3, ["e", "f", "e"]),
], ("id", "urllinks"))
a = build_ngrams_part().fit(df_test)
b = a.transform(df_test)
b.show(3)
stages = a.stages
from pyspark.ml.feature import CountVectorizerModel
vectorizers = [s for s in stages if isinstance(s, CountVectorizerModel)]
[v.vocabulary for v in vectorizers]
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
| id| urllinks| 1_links| 2_links| 3_links| 1_counts| 2_counts| 3_counts|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
| 1|[a, b, c, d]|[a, b, c, d]|[a b, b c, c d]|[a b c, b c d]|(6,[1,2,3,4],[1.0...|(7,[0,2,3],[1.0,1...|(4,[0,2],[1.0,1.0])|
| 2| [d, e, d]| [d, e, d]| [d e, e d]| [d e d]| (6,[0,1],[1.0,2.0])| (7,[1,4],[1.0,1.0])| (4,[3],[1.0])|
| 3| [e, f, e]| [e, f, e]| [e f, f e]| [e f e]| (6,[0,5],[2.0,1.0])| (7,[5,6],[1.0,1.0])| (4,[1],[1.0])|
+---+------------+------------+---------------+--------------+--------------------+--------------------+-------------------+
b.write.saveAsTable("sample.output")
请帮助解答查询部分的问题。
1条答案
按热度按时间ws51t4hk1#
我已经减少了传送到配置单元表的数据量。错误已更正。。