我可以从hbase表构造hbaserdd。之后,我尝试将其转换为scala case类df。但是从bytes.toint转换时出现异常。感谢Maven们的帮助
scala案例类:
case class UserProfile(User_Id: String, Card_Account_Number: Long, First_name: String, Last_name: String, email: String, gender: String, ip_address: String, user_name: String, address: String,phone:String,No_Transactions_in_24_hrs:Int,No_IPs_In_24_hrs:Int,TotalAmount_spent_in_24_hrs:Float,AvgAmount_spent_in_24_hrs:Float,Total_No_Transactions:Int,Amount_spent_so_far:Float)
//函数来分析输入
object UserProfile extends Serializable{
def parseUserProfile(result: Result): UserProfile = {
val p0=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("User_Id")))
val p1 =Bytes.toLong(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("Card_Account_Number")))
val p2=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("First_name")))
val p3=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("Last_name")))
val p4=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("email")))
val p5=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("gender")))
val p6=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("ip_address")))
val p7=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("user_name")))
val p8=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("address")))
val p9=Bytes.toString(result.getValue(User_PersonalProfileBytes, Bytes.toBytes("phone")))
val p10=Bytes.toInt(result.getValue(User_TransactionHistoryBytes, Bytes.toBytes("No_Transactions_in_24_hrs")))
val p11=Bytes.toInt(result.getValue(User_TransactionHistoryBytes, Bytes.toBytes("No_Ips_In_24_hrs")))
val p12=Bytes.toFloat(result.getValue(User_TransactionHistoryBytes, Bytes.toBytes("TotalAmount_spent_in_24_hrs")))
val p13=Bytes.toFloat(result.getValue(User_TransactionHistoryBytes, Bytes.toBytes("AvgAmount_spent_in_24_hrs")))
val p14=Bytes.toInt(result.getValue(User_TransactionHistoryBytes, Bytes.toBytes("Total_No_Transactions")))
val p15=Bytes.toFloat(result.getValue(User_TransactionHistoryBytes, Bytes.toBytes("Amount_spent_so_far")))
UserProfile(p0, p1, p2, p3, p4, p5, p6,p7,p8,p9,p10,p11,p12,p13,p14,p15)
}}
**Spark-Hbase code :**
val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val sparkConf1 = HBaseConfiguration.create()
val tableName = "UserProfile"
sparkConf1.set(TableInputFormat.INPUT_TABLE, tableName)
sparkConf1.set("hbase.zookeeper.property.clientPort","2182");
sparkConf1.set(TableInputFormat.SCAN_COLUMNS,
"User_PersonalProfile","User_TransactionHistory");
val hBaseRDD = sc.newAPIHadoopRDD(sparkConf1, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
println("Number of Records found : " + hBaseRDD.count())
val count = hBaseRDD.count
val resultRDD = hBaseRDD.map(tuple => tuple._2)
println(resultRDD)
val profileRdd=resultRDD.map(UserProfile.parseUserProfile)
val userProfileDF = profileRdd.toDF()
userProfileDF.printSchema()
userProfileDF.show()
userProfileDF.registerTempTable("UserProfileRow")
sc.stop()
引发异常:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost): java.lang.NullPointerException
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:801)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:778)
at com.research.spark.PaymentProcessor$UserProfile$.parseUserProfile(PaymentProcessor.scala:75)
at com.research.spark.PaymentProcessor$$anonfun$5.apply(PaymentProcessor.scala:193)
at com.research.spark.PaymentProcessor$$anonfun$5.apply(PaymentProcessor.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
at com.research.spark.PaymentProcessor$.main(PaymentProcessor.scala:197)
at com.research.spark.PaymentProcessor.main(PaymentProcessor.scala)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:801)
at org.apache.hadoop.hbase.util.Bytes.toInt(Bytes.java:778)
at com.research.spark.PaymentProcessor$UserProfile$.parseUserProfile(PaymentProcessor.scala:75)
at com.research.spark.PaymentProcessor$$anonfun$5.apply(PaymentProcessor.scala:193)
at com.research.spark.PaymentProcessor$$anonfun$5.apply(PaymentProcessor.scala:193)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
暂无答案!
目前还没有任何答案,快来回答吧!