python—如何提高向redis写入sparkDataframe的速度?

bvjxkvbb  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(536)

我正在开发一个基于flask的图书推荐api,发现要管理多个请求,我需要预先计算相似度矩阵,并将其存储在某个地方以备将来查询。这个矩阵是使用pyspark创建的,基于150万个数据库条目,包括图书id、名称和元数据,结果可以用这个模式来描述( i 以及 j 是图书索引, dot 元数据的相似性):

StructType(List(StructField(i,IntegerType,true),StructField(j,IntegerType,true),StructField(dot,DoubleType,true)))

最初,我打算使用spark redis连接器将其存储在redis上。但是,以下命令的运行速度似乎非常慢(即使初始图书数据库查询大小限制为非常适中的40k批处理):

similarities.write.format("org.apache.spark.sql.redis").option("table", "similarities").option("key.column", "i").save()

spark将最初的任务分为9个阶段,其中3个阶段大约需要6个小时才能完成。奇怪的是,spark执行器的存储内存使用率非常低,只有20kb左右。spark应用程序ui描述了一个典型的阶段活动阶段:

org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
py4j.Gateway.invoke(Gateway.java:282)
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
py4j.commands.CallCommand.execute(CallCommand.java:79)
py4j.GatewayConnection.run(GatewayConnection.java:238)
java.lang.Thread.run(Thread.java:748)

有没有可能加快这一进程?我的spark会话是这样设置的:

SUBMIT_ARGS = "  --driver-memory 2G --executor-memory 2G --executor-cores 4 --packages mysql:mysql-connector-java:5.1.39 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
conf = SparkConf().set("spark.jars", "spark-redis/target/spark-redis_2.11-2.4.3-SNAPSHOT-jar-with-dependencies.jar").set("spark.executor.memory", "4g")
sc = SparkContext('local','example', conf=conf) 
sql_sc = SQLContext(sc)
qojgxg4l

qojgxg4l1#

你可以试着用 Append 保存模式以避免检查表中是否已存在数据:

similarities.write.format("org.apache.spark.sql.redis").option("table", "similarities").mode('append').option("key.column", "i").save()

另外,你可能想要改变

sc = SparkContext('local','example', conf=conf)

sc = SparkContext('local[*]','example', conf=conf)

使用机器上的所有内核。
顺便问一下,使用正确吗 i 作为redis的钥匙?不应该两者兼而有之吗 i 以及 j ?

相关问题