spark从ammonite脚本运行时找不到“spark version info.properties”

zlhcx6iw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(478)

我有一个ammonite脚本,它创建了一个spark上下文:


# !/usr/local/bin/amm

import ammonite.ops._

import $ivy.`org.apache.spark:spark-core_2.11:2.0.1`

import org.apache.spark.{SparkConf, SparkContext}

@main
def main(): Unit = {
  val sc =  new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo"))
}

当我运行这个脚本时,它会抛出一个错误:

Exception in thread "main" java.lang.ExceptionInInitializerError
Caused by: org.apache.spark.SparkException: Error while locating file spark-version-info.properties
...
Caused by: java.lang.NullPointerException
    at java.util.Properties$LineReader.readLine(Properties.java:434)
    at java.util.Properties.load0(Properties.java:353)

脚本不是从spark安装目录运行的,也不知道它或打包此版本信息的资源—它只知道ivy依赖项。因此,问题可能是这些资源信息不在ivy依赖中的类路径上。我看过其他spark“独立脚本”,所以我希望我也能在这里这样做。
我翻了一番,想弄明白发生了什么事。我希望我能在运行时以编程的方式将一些构建信息破解到系统属性中。
异常源来自spark库中的package.scala。相关的代码位是

val resourceStream = Thread.currentThread().getContextClassLoader.
  getResourceAsStream("spark-version-info.properties")

try {
  val unknownProp = "<unknown>"
  val props = new Properties()
  props.load(resourceStream) <--- causing a NPE?
  (
    props.getProperty("version", unknownProp),
    // Load some other properties
  )
} catch {
  case npe: NullPointerException =>
    throw new SparkException("Error while locating file spark-version-info.properties", npe)

似乎隐含的假设是 props.load 如果在资源中找不到版本信息,npe将失败(读者不太清楚!)
npe本身看起来像是来自 java.util.Properties.java :

class LineReader {
    public LineReader(InputStream inStream) {
        this.inStream = inStream;
        inByteBuf = new byte[8192];
    }

    ...
    InputStream inStream;
    Reader reader;

    int readLine() throws IOException {
      ...
      inLimit = (inStream==null)?reader.read(inCharBuf)
                                :inStream.read(inByteBuf);

这个 LineReader 是用空值构造的 InputStream 类内部解释为 reader 是非空的,应该改为使用-但是 null . (这种东西真的在标准图书馆里吗?看起来很不安全……)
从看 bin/spark-shell 它补充道,这是Spark带来的 -Dscala.usejavacp=true 当它启动时 spark-submit . 这是正确的方向吗?
谢谢你的帮助!

pgx2nnw8

pgx2nnw81#

以下内容似乎适用于2.11和1.0.1版本,但不是实验性的。
在spark 2.2上可以得到更好的实现


# !/usr/local/bin/amm

import ammonite.ops._
import $ivy.`org.apache.spark:spark-core_2.11:2.2.0` 
import $ivy.`org.apache.spark:spark-sql_2.11:2.2.0`
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql._ 
import org.apache.spark.sql.SparkSession

@main
def main(): Unit = {
    val sc =  new SparkContext(new SparkConf().setMaster("local[2]").setAppName("Demo"))
}

或更详细的答案:

@main
def main(): Unit = {
    val spark = SparkSession.builder()
      .appName("testings")
      .master("local")
      .config("configuration key", "configuration value")
      .getOrCreate 
  val sqlContext = spark.sqlContext
  val tdf2 = spark.read.option("delimiter", "|").option("header", true).csv("./tst.dat")
  tdf2.show()
}

相关问题