我在scala中有一个spark应用程序,它每10秒从Kafka那里获取一次记录,并将它们保存为文件。这是sbt项目,我用它运行我的应用程序 sbt run
命令。在我将应用程序部署到tomcat上之前,一切正常。我用这个插件成功地生成了war文件,但是当我的应用程序部署在tomcat上时,它看起来没有任何作用。
这是我的密码:
object SparkConsumer {
def main (args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(10))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "group_id",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("mytopic")
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
stream.map(record => (record.key, record.value)).print
val arr = new ArrayBuffer[String]();
val lines = stream.map(record => (record.key, record.value));
stream.foreachRDD { rdd =>
if (rdd.count() > 0 ) {
val date = System.currentTimeMillis()
rdd.saveAsTextFile ("/tmp/sparkout/mytopic/" + date.toString)
rdd.foreach { record => println("t=" + record.topic + " m=" + record.toString()) }
}
println("Stream had " + rdd.count() + " messages")
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.foreachPartition { iter =>
val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
println(o)
}
}
stream.saveAsTextFiles("/tmp/output")
ssc.start()
ssc.awaitTermination()
}
}
奇怪的是,应用程序在运行时完全正常 sbt run
命令。它正确读取Kafka的记录,并将其保存为所需目录中的文件。我不知道发生了什么事。我尝试使用启用日志记录 log4j
但在tomcat上它甚至什么都不记录。我一直在寻找答案,但没有找到解决办法。
总而言之
我的scalaspark应用程序(是sbt项目)应该每隔10秒从kafka读取一次记录并将它们保存为文件。当通过 sbt run
命令,但部署在tomcat上时没有。
其他信息:
斯卡拉2.12
汤姆猫7
收缩压0.13.15
要求更多
问:有什么问题?
1条答案
按热度按时间l3zydbqr1#
热释光;dr独立应用程序
SparkConsumer
在tomcat上表现得很好,tomcat本身也是如此。读到这个问题我感到非常惊讶,因为您的代码不是我期望在tomcat上使用的代码。对不起的。
tomcat是一个servlet容器,因此在web应用程序中需要servlet。
即使您成功地创建了一个war并将其部署到tomcat,您也没有从这个web应用程序中“触发”任何东西来启动spark流应用程序(其中的代码)
main
方法)。spark流应用程序在使用
sbt run
因为这是我们的目标sbt run
,即在sbt管理的项目中执行独立应用程序。如果您的sbt项目中只有一个独立的应用程序,
sbt run
设法找到了SparkConsumer
并执行main
输入方法。这并不奇怪。但是,它不会在tomcat上工作。您必须将应用程序公开为post或get端点,并使用http客户机(curl、wget或httpie等浏览器或命令行工具)来执行它。
spark不支持scala 2.12,所以…您是如何将scala版本与spark一起使用的?!不可能的!