我的目标是运行一个spark流示例集群,用一些简单的算法来处理数据。例如,首先是
def updateFunction(newValues, runningCount):
if runningCount is None:
runningCount = 0
return sum(newValues, runningCount)
# Produces two tuples from each one, and sums its values. Then orders the result.
digest = ks.flatMap(producePerAorB)\
.updateStateByKey(updateFunction)\
.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
当我把Kafka插入星星之火时,我面临很多性能问题。
我的设置说明:
我使用的是一个由6个健壮的awsm4.xlarge组成的集群,所以硬件应该不是问题
Kafka主题中的24个分区。尝试了1,6,12和48,似乎是最好的。数据是通过简单的管道文件到控制台生产者产生的。
以Yarn为主。将它调整到每种可能的设置:容器越来越大,容器越来越少,容器越来越小。
皮斯帕克。Spark1.5.2。一切都安排好了。
我试过直接流媒体和接收器。第一种方法应该更容易调优,但事实证明它完全忽略了并行性,这与文档所说的相反。现在随着接收器的方法,它似乎表现得更好。然而,几分钟后,我开始得到大量的yarnhistoryservice:丢弃事件消息,这可能意味着我正在丢失数据。在互联网上关于它的信息不多,我已经潜入spark的代码试图破译它。显然是Yarn的监控服务,但它只是看起来不对劲,他们开始堆积,数据没有得到很好的刷新,所以它可能是错误的。
我正在为spark/kafka使用这些设置:---conf spark.streaming.backpressure.enabled=true--conf spark.streaming.kafka.maxrate=5000\它们似乎什么都不做。
我尝试了不同的代码和最简单的操作,但仍然得到相同的错误。
看看我的表现,尤其是那些错误,我想我做错了什么。我有点没主意了,在推测了一些没有多大帮助的复杂场景之后。我希望Spark和Kafka更容易设置!
如有任何提示,我们将不胜感激。谢谢
暂无答案!
目前还没有任何答案,快来回答吧!