使用scala将sparkrdd写入hbase表

zengzsys  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(402)

我正在尝试使用scala(以前没有使用过)编写一个sparkrdd到hbase表。整个代码如下:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable    
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark._
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.rdd.PairRDDFunctions
import org.apache.spark.SparkContext._
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client._

object HBaseWrite {
   def main(args: Array[String]) {
     val sparkConf = new SparkConf().setAppName("HBaseWrite").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
     val sc = new SparkContext(sparkConf)
     val conf = HBaseConfiguration.create()
     val outputTable = "tablename"

     System.setProperty("user.name", "hdfs")
     System.setProperty("HADOOP_USER_NAME", "hdfs")
     conf.set("hbase.master", "localhost:60000")
     conf.setInt("timeout", 120000)
     conf.set("hbase.zookeeper.quorum", "localhost")
     conf.set("zookeeper.znode.parent", "/hbase-unsecure")
     conf.setInt("hbase.client.scanner.caching", 10000)
     sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
     val jobConfig: JobConf = new JobConf(conf,this.getClass)
     jobConfig.setOutputFormat(classOf[TableOutputFormat])
     jobConfig.set(TableOutputFormat.OUTPUT_TABLE,outputTable)
     val x = 12
     val y = 15
     val z = 25
     var newarray = Array(x,y,z)
     val newrddtohbase = sc.parallelize(newarray)
     def convert(a:Int) : Tuple2[ImmutableBytesWritable,Put] = {
          val p = new Put(Bytes.toBytes(a))
          p.add(Bytes.toBytes("columnfamily"),
          Bytes.toBytes("col_1"), Bytes.toBytes(a))
          new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p);
     }
     new PairRDDFunctions(newrddtohbase.map(convert)).saveAsHadoopDataset(jobConfig)
     sc.stop()
   }
}

我在执行hbasewrite(main(array())之后遇到的错误如下:

org.apache.spark.SparkException: Task not serializable

我该如何着手去完成它?

bxjv4tth

bxjv4tth1#

例如,下面的方法以int作为参数并返回double

var toDouble: (Int) => Double = a => {
    a.toDouble
}

你可以用 toDouble(2) 它又回来了 2.0 同样的方法可以将方法转换为函数文本,如下所示。

val convert: (Int) => Tuple2[ImmutableBytesWritable,Put] = a => {
              val p = new Put(Bytes.toBytes(a))
              p.add(Bytes.toBytes("columnfamily"),
              Bytes.toBytes("col_1"), Bytes.toBytes(a))
              new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p);
         }
euoag5mw

euoag5mw2#

你在这里犯的错误是定义 convert 内部 main 如果以这种方式编写此代码,它可能会工作:

object HBaseWrite {
       def main(args: Array[String]) {
         val sparkConf = new SparkConf().setAppName("HBaseWrite").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
         val sc = new SparkContext(sparkConf)
         val conf = HBaseConfiguration.create()
         val outputTable = "tablename"

         System.setProperty("user.name", "hdfs")
         System.setProperty("HADOOP_USER_NAME", "hdfs")
         conf.set("hbase.master", "localhost:60000")
         conf.setInt("timeout", 120000)
         conf.set("hbase.zookeeper.quorum", "localhost")
         conf.set("zookeeper.znode.parent", "/hbase-unsecure")
         conf.setInt("hbase.client.scanner.caching", 10000)
         sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
         val jobConfig: JobConf = new JobConf(conf,this.getClass)
         jobConfig.setOutputFormat(classOf[TableOutputFormat])
         jobConfig.set(TableOutputFormat.OUTPUT_TABLE,outputTable)
         val x = 12
         val y = 15
         val z = 25
         var newarray = Array(x,y,z)
         val newrddtohbase = sc.parallelize(newarray)
         val convertFunc = convert _
         new PairRDDFunctions(newrddtohbase.map(convertFunc)).saveAsHadoopDataset(jobConfig)
         sc.stop()
       }
       def convert(a:Int) : Tuple2[ImmutableBytesWritable,Put] = {
              val p = new Put(Bytes.toBytes(a))
              p.add(Bytes.toBytes("columnfamily"),
              Bytes.toBytes("col_1"), Bytes.toBytes(a))
              new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p);
         }
    }

p、 s:代码没有经过测试,但应该可以用!

相关问题