spark:使用scala的hbase批量加载

dgtucam1  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(352)

我们有一个10万条记录的文本文件,我们需要逐行读取文件并将其值插入hbase。文件以“|”分隔。
文本文件示例:

SLNO|Name|City|Pincode
    1|ABC|Pune|400104
    2|BMN|Delhi|100065

每列都有不同的列族。我们正尝试使用hbase批量加载在sparkscala中实现这一点。我们发现了这个链接,建议批量装载:http://www.openkb.info/2015/01/how-to-use-scala-on-spark-to-load-data.html
使用以下语法插入到单列族中。

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)

// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), 
"c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})

// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], 
classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())

有谁能给我们提供关于多柱族大批量荷载插入的建议。

uqzxnwby

uqzxnwby1#

查看rdd.saveasnewapihadoopdataset,将数据插入hbase表。

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("sparkToHive").enableHiveSupport().getOrCreate()
    import spark.implicits._

    val config = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", "ip's")
    config.set("hbase.zookeeper.property.clientPort","2181")
    config.set(TableInputFormat.INPUT_TABLE, "tableName")

    val newAPIJobConfiguration1 = Job.getInstance(config)
    newAPIJobConfiguration1.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, "tableName")
    newAPIJobConfiguration1.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val df: DataFrame  = Seq(("foo", "1", "foo1"), ("bar", "2", "bar1")).toDF("key", "value1", "value2")

    val hbasePuts= df.rdd.map((row: Row) => {
      val  put = new Put(Bytes.toBytes(row.getString(0)))
      put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("value1"), Bytes.toBytes(row.getString(1)))
      put.addColumn(Bytes.toBytes("cf2"), Bytes.toBytes("value2"), Bytes.toBytes(row.getString(2)))
      (new ImmutableBytesWritable(), put)
    })

    hbasePuts.saveAsNewAPIHadoopDataset(newAPIJobConfiguration1.getConfiguration())
    }

裁判:https://sparkkb.wordpress.com/2015/05/04/save-javardd-to-hbase-using-saveasnewapihadoopdataset-spark-api-java-coding/

相关问题