我使用带有2个工作进程的独立集群。使用spark kafka cassandra hdfs流
val stream = kafkaUtils.createDirectStream...
stream.map(rec => Row(rec.offset, rev.value)).saveToCassandra(...)
stream.map(_.value).foreachRDD(rdd => {saving to HDFS})
我发送给Kafka大约40000消息/秒的第一件事是savetocassandra工作缓慢,因为如果我评论 stream.saveToCassandra
它工作得又好又快。在spark驱动程序ui中,我看到5mb的输出大约需要20秒。我试着调整spark cassandra选项,但也需要最少14秒。
第二个是我提到的,我的一个工人什么也不做,我看到这样的情况:
10:05:33 INFO remove RDD#
等等。
但如果我阻止另一个工人,它就会开始工作。
我不使用spark submit,只是 startSpark extends App {
以及孔代码,然后从
scala -cp "spark libs:kafka:startSpark.jar" startSpark
我使用的工人 ssc.sparkContext.addJars(pathToNeedableJars)
我怎样才能促进写Cassandra和如何让我的工人一起工作?
1条答案
按热度按时间rt4zxlrg1#
我真的读不好官方的sparkKafka集成指南,这个问题,我用我的主题1分区
Kafka分区与Spark分区的1:1对应关系