我重写了这段代码:
import org.apache.spark.sql.SparkSession
object SimpleApp {
def main(args: Array[String]) {
val logFile = "file:///root/spark/README.md"
val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
spark.stop()
}
}
对此:
import org.apache.livy._
import org.apache.spark.sql.SparkSession
class Test extends Job[Int]{
override def call(jc: JobContext): Int = {
val spark = jc.sparkSession()
val logFile = "file:///root/spark/README.md"
val logData = spark.read.textFile(logFile).cache()
val numAs = logData.filter(line => line.contains("a")).count()
val numBs = logData.filter(line => line.contains("b")).count()
println(s"Lines with a: $numAs, Lines with b: $numBs")
1 //Return value
}
}
但是当用sbt编译它时,val spark没有正确识别,我收到错误“value read is not a member of nothing”
另外,在注解spark相关代码之后,当我尝试使用/batches运行结果jar文件时,我收到错误“java.lang.nosuchmethodexception:test.main([ljava.lang.string;)”
请问任何人都可以用正确的spark scala代码重写方式吗?
1条答案
按热度按时间disho6za1#
使用livy不需要重写spark应用程序。相反,您可以使用它的rest接口在具有运行livy服务器的集群上提交作业、检索日志、获取作业状态等。
作为一个实际的例子,下面是在aws上运行应用程序的说明。
设置:
使用aws emr创建一个spark集群,该集群包含spark、livy和您的应用程序所需的任何其他预装应用程序。
将jar上传到awss3。
确保连接到集群的安全组有一个入站规则,该规则将端口8998(livy的端口)上的ip列为白名单。
确保集群可以访问s3以获取jar。
现在,您可以使用curl(或任何等效工具)发出post请求来提交您的申请: