我正在尝试使用spark 1.5.1(scala 2.10.2)从hdfs(spark avro 1.7.7)读取一些.avro文件,以便对它们进行一些计算。
现在,假设我已经彻底搜索了web以找到解决方案(到目前为止,最好的链接是建议使用genericrecord的链接,而这个链接报告了相同的问题,而这个链接对我不起作用,因为它提供的代码与我使用的代码几乎相同),我在这里问,因为可能有人也有同样的想法。代码如下:
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable
import org.apache.spark.{SparkConf, SparkContext}
object SparkPOC {
def main(args: Array[String]): Unit ={
val conf = new SparkConf()
.setAppName("SparkPOC")
.set("spark.master", "local[4]")
val sc = new SparkContext(conf)
val path = args(0)
val profiles = sc.hadoopFile(
path,
classOf[AvroInputFormat[MyRecord]],
classOf[AvroWrapper[MyRecord]],
classOf[NullWritable]
)
val timeStamps = profiles.map{ p => p._1.datum.getTimeStamp().toString}
timeStamps.foreach(print)
}
我得到以下信息:
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to packagename.MyRecord
at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24)
at packagename.SparkPOC$$anonfun$1.apply(SparkPOC.scala:24)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:890)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1848)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
有人有线索吗?我也在考虑使用spark avro的可能性,但是它们不支持同时读取多个文件(而.hadoopfile支持通配符)。否则,我似乎不得不使用genericord并使用.get方法,从而失去了编码模式(myrecord)的优势。
提前谢谢。
2条答案
按热度按时间tktrz96b1#
我通常把它作为genericord读入,并根据需要显式地转换
k7fdbhmy2#
在我设置kryoserializer和spark.kryo.registator类之后,问题就解决了,如下所示:
avrokryoregistator就是这样的。