hadoop—如何从hbase读取记录,然后将其存储到spark rdd(弹性分布式数据集)中;读取一条rdd记录,然后写入hbase?

pcww981p  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(295)

因此,我想编写一个代码,从hadoop hbase读取一条记录,然后将其存储到sparkrdd(弹性分布式数据集)中;读取一条rdd记录,然后写入hbase。我对这两者都不了解,我需要使用aws云或hadoop虚拟机。有人请引导我从头开始。

xpszyzbs

xpszyzbs1#

请使用scala中的基本代码,我们正在使用scala读取hbase中的数据。类似地,您可以编写一个表创建来将数据写入hbase

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseApp {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseApp").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "table1"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 100000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found : " + hBaseRDD.count())
    sc.stop()
  }
}

相关问题