我有一个代码,可以读取hbase表,将其格式化,然后将其转换为Dataframe:
import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
val tableName = "my_table"
val conf = HBaseConfiguration.create()
// Add local HBase conf
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.7.1-1.cdh5.7.1.p0.11/etc/hbase/conf.dist/hbase-site.xml"))
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val admin = new HBaseAdmin(conf)
admin.isTableAvailable(tableName)
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
case class MyClass(srcid: Long, srcLat: Double, srcLong: Double, dstid: Long, dstLat: Double, dstLong: Double, time: Int, duration: Integer )
val parsed = hBaseRDD.map{ case(b, a) => val iter = a.list().iterator();
( Bytes.toString(a.getRow()).toLong,
Bytes.toString( iter.next().getValue()).toDouble,
Bytes.toString(iter.next().getValue()).toDouble,
Bytes.toString(iter.next().getValue()).toLong,
Bytes.toString(iter.next().getValue()).toDouble,
Bytes.toString(iter.next().getValue()).toDouble,
Bytes.toString(iter.next().getValue()).toInt,
Bytes.toString(iter.next().getValue())
)}.map{ s =>
val time = s._8.replaceAll( "T", "")
val time2 = time.replaceAll( "\\+03:00", "")
val format = new java.text.SimpleDateFormat("yyyy-MM-ddHH:mm:ss.SSS")
val date = format.parse(time2)
MyClass( s._1,
s._5,
s._6,
s._4,
s._2,
s._3,
date.getHours(),
//s(6),
s._7) }.toDF()
parsed.registerTempTable("my_table")
这个代码在sparkshell中运行得很好。但是我想在齐柏林飞艇笔记本里用这个。我本以为这个能在巴拉圭井上起作用。但是,当我运行代码时,它在import语句中输出以下错误:
<console>:28: error: object hbase is not a member of package org.apache.hadoop
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
我是否需要添加一个依赖项以在齐柏林飞艇中使用带spark的hbase。如果是,我怎么做?
1条答案
按热度按时间eni9jsuy1#
如文档中所述,向hbase添加依赖关系:http://zeppelin.apache.org/docs/0.6.0/manual/dependencymanagement.html
你需要
org.apache.hbase:hbase:1.2.3
另外,您可能对zeppelin hbase解释器感兴趣,它可以直接从zeppelin运行hbase查询。不过,这是不可能的