为什么spark流应用程序在使用sbt运行时运行良好,但在tomcat(作为web应用程序)上却不运行?

irlmq6kh  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(529)

我在scala中有一个spark应用程序,它每10秒从Kafka那里获取一次记录,并将它们保存为文件。这是sbt项目,我用它运行我的应用程序 sbt run 命令。在我将应用程序部署到tomcat上之前,一切正常。我用这个插件成功地生成了war文件,但是当我的应用程序部署在tomcat上时,它看起来没有任何作用。
这是我的密码:

object SparkConsumer {
  def main (args: Array[String]) {

    val conf = new SparkConf().setMaster("local[*]").setAppName("KafkaReceiver")
    val ssc = new StreamingContext(conf, Seconds(10))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group_id",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("mytopic")
    val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

    stream.map(record => (record.key, record.value)).print

    val arr = new ArrayBuffer[String]();

    val lines = stream.map(record => (record.key, record.value));

    stream.foreachRDD { rdd =>

      if (rdd.count() > 0 ) {
            val date = System.currentTimeMillis()
            rdd.saveAsTextFile ("/tmp/sparkout/mytopic/" + date.toString)
            rdd.foreach { record => println("t=" + record.topic + " m=" + record.toString()) }
      }

      println("Stream had " + rdd.count() + " messages")

      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
        rdd.foreachPartition { iter =>
        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        println(o)
      }
    }

    stream.saveAsTextFiles("/tmp/output")

    ssc.start()
    ssc.awaitTermination()
  }
}

奇怪的是,应用程序在运行时完全正常 sbt run 命令。它正确读取Kafka的记录,并将其保存为所需目录中的文件。我不知道发生了什么事。我尝试使用启用日志记录 log4j 但在tomcat上它甚至什么都不记录。我一直在寻找答案,但没有找到解决办法。
总而言之
我的scalaspark应用程序(是sbt项目)应该每隔10秒从kafka读取一次记录并将它们保存为文件。当通过 sbt run 命令,但部署在tomcat上时没有。
其他信息:
斯卡拉2.12
汤姆猫7
收缩压0.13.15
要求更多
问:有什么问题?

l3zydbqr

l3zydbqr1#

热释光;dr独立应用程序 SparkConsumer 在tomcat上表现得很好,tomcat本身也是如此。
读到这个问题我感到非常惊讶,因为您的代码不是我期望在tomcat上使用的代码。对不起的。
tomcat是一个servlet容器,因此在web应用程序中需要servlet。
即使您成功地创建了一个war并将其部署到tomcat,您也没有从这个web应用程序中“触发”任何东西来启动spark流应用程序(其中的代码) main 方法)。
spark流应用程序在使用 sbt run 因为这是我们的目标 sbt run ,即在sbt管理的项目中执行独立应用程序。
如果您的sbt项目中只有一个独立的应用程序, sbt run 设法找到了 SparkConsumer 并执行 main 输入方法。这并不奇怪。
但是,它不会在tomcat上工作。您必须将应用程序公开为post或get端点,并使用http客户机(curl、wget或httpie等浏览器或命令行工具)来执行它。
spark不支持scala 2.12,所以…您是如何将scala版本与spark一起使用的?!不可能的!

相关问题