如何通过sparksql上下文连接hbase?

vq8itlhq  于 2021-06-10  发布在  Hbase
关注(0)|答案(0)|浏览(307)

我正在研究一个场景,在这个场景中,我希望从sparksql访问hbase并将其作为Dataframe读取。下面是代码。

import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import com.google.protobuf.ServiceException;

public class hbasesql {

public static void main(String args[])
{

SparkConf conf = new SparkConf().setAppName("HbaseStreamTest").setMaster("yarn-client");
conf.set("spark.executor.extraClassPath", "/etc/hbase/conf/hbase-site.xml");
conf.set("hbase.zookeeper.quorum", "=servername:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");

Configuration hconf = HBaseConfiguration.create();
hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
hconf.addResource(new Path("/etc/hbase/conf/core-site.xml"));

final  JavaSparkContext context =   new JavaSparkContext(conf);

SQLContext sqlContext = new org.apache.spark.sql.SQLContext(context);

try {
        HBaseAdmin.checkHBaseAvailable(hconf);
        System.out.println("HBase is running"); //This works
    } catch (ServiceException e) {
        System.out.println("HBase is not running");
        e.printStackTrace();
    } catch (MasterNotRunningException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (ZooKeeperConnectionException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    String sqlMapping = "KEY_FIELD STRING :key" + ", sql_firstname STRING c1:FIRSTNAME" + ","
            + "sql_lastname STRING c1:LASTNAME" ;

    HashMap<String, String> colMap = new HashMap<String, String>();
    colMap.put("hbase.columns.mapping", sqlMapping);
    colMap.put("hbase.table", "testtable");

    // DataFrame dfJail =
    DataFrame df = sqlContext.read().format("org.apache.hadoop.hbase.spark").options(colMap).load();
    //DataFrame df = sqlContext.load("org.apache.hadoop.hbase.spark", colMap);

    // This is useful when issuing SQL text queries directly against the
    // sqlContext object.
    df.registerTempTable("temp_emp");

    DataFrame result = sqlContext.sql("SELECT count(*) from temp_emp");
    System.out.println("df  " + df);
    System.out.println("result " + result);

df.show();  
}   
}

但在运行相同的程序时,我得到了以下异常。

17/02/22 00:44:28 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
17/02/22 00:44:28 INFO storage.BlockManagerMasterEndpoint: Registering block manager servername:32831 with 2.1 GB RAM, BlockManagerId(2, servername, 32831)
Exception in thread "main" java.lang.NullPointerException
    at org.apache.hadoop.hbase.spark.HBaseRelation.<init>(DefaultSource.scala:175)
    at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(DefaultSource.scala:78)
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
    at hbasesql.main(hbasesql.java:65)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/02/22 00:44:28 INFO spark.SparkContext: Invoking stop() from shutdown hook

我猜sparkjavacontext中的sqlcontext无法将hbase-site.xml配置加载到其中。还是有其他问题?
非常感谢您的帮助。
提前感谢:)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题