scala在apachespark中读取rdf

nhn9ugyo  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(545)

我在看书 RDF\XML 使用apachejena将文件导入apachespark(scala 2.11,apachespark 1.4.1)。我写了这个scala片段:

val factory = new RdfXmlReaderFactory()
HadoopRdfIORegistry.addReaderFactory(factory)
val conf = new Configuration()
conf.set("rdf.io.input.ignore-bad-tuples", "false")
val data = sc.newAPIHadoopFile(path,
    classOf[RdfXmlInputFormat],
    classOf[LongWritable], //position
    classOf[TripleWritable],   //value
    conf)
data.take(10).foreach(println)

但它抛出了一个错误:

INFO readers.AbstractLineBasedNodeTupleReader: Got split with start 0 and length 21765995 for file with total length of 21765995
15/07/23 01:52:42 ERROR readers.AbstractLineBasedNodeTupleReader: Error parsing whole file, aborting further parsing
org.apache.jena.riot.RiotException: Producer failed to ever call start(), declaring producer dead
        at org.apache.jena.riot.lang.PipedRDFIterator.hasNext(PipedRDFIterator.java:272)
        at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.nextKeyValue(AbstractWholeFileNodeTupleReader.java:242)
        at org.apache.jena.hadoop.rdf.io.input.readers.AbstractRdfReader.nextKeyValue(AbstractRdfReader.java:85)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)
   ...
ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.io.IOException: Error parsing whole file at position 0, aborting further parsing
        at org.apache.jena.hadoop.rdf.io.input.readers.AbstractWholeFileNodeTupleReader.nextKeyValue(AbstractWholeFileNodeTupleReader.java:285)
        at org.apache.jena.hadoop.rdf.io.input.readers.AbstractRdfReader.nextKeyValue(AbstractRdfReader.java:85)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:350)

这个文件很好,因为我可以在本地解析它。我错过了什么?
编辑一些信息来重现行为
进口:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.LongWritable
import org.apache.jena.hadoop.rdf.io.registry.HadoopRdfIORegistry
import org.apache.jena.hadoop.rdf.io.registry.readers.RdfXmlReaderFactory
import org.apache.jena.hadoop.rdf.types.QuadWritable
import org.apache.spark.SparkContext

scalaversion:=“2.11.7”
依赖项:

"org.apache.hadoop"             % "hadoop-common"      % "2.7.1",
"org.apache.hadoop"             % "hadoop-mapreduce-client-common" % "2.7.1",
"org.apache.hadoop"             % "hadoop-streaming"   % "2.7.1", 
"org.apache.spark"              % "spark-core_2.11"  % "1.4.1", 
"com.hp.hpl.jena"               % "jena"               % "2.6.4",
"org.apache.jena"               % "jena-elephas-io"    % "0.9.0",
"org.apache.jena"               % "jena-elephas-mapreduce" % "0.9.0"

我使用的是这里的rdf样本。这是免费提供的有关约翰皮尔会话的信息(有关转储的更多信息)。

olqngx59

olqngx591#

谢谢大家在评论中讨论。这个问题确实很棘手,而且从堆栈跟踪中看不清楚:代码需要一个额外的依赖项才能工作 jena-core 必须首先打包此依赖项。

"org.apache.jena" % "jena-core" % "2.13.0"
"com.hp.hpl.jena" % "jena"      % "2.6.4"

我使用这种装配策略:

lazy val strategy = assemblyMergeStrategy in assembly <<= (assemblyMergeStrategy in assembly) { (old) => {
  case PathList("META-INF", xs @ _*) =>
    (xs map {_.toLowerCase}) match {
      case ("manifest.mf" :: Nil) | ("index.list" :: Nil) | ("dependencies" :: Nil) => MergeStrategy.discard
      case _ => MergeStrategy.discard
    }
  case x => MergeStrategy.first
}
}
ippsafx7

ippsafx72#

因此,您的问题似乎是由您手动管理依赖项造成的。
在我的环境中,我只是将以下内容传递给我的spark shell:

--packages org.apache.jena:jena-elephas-io:0.9.0

这将为您完成所有依赖项解析
如果你是建立一个sbt项目,那么它应该是足够的,做以下在你的工作 build.sbt :

libraryDependencies += "org.apache.jena" % "jena-elephas-io" % "0.9.0"

相关问题