我正在研究一个场景,在这个场景中,我希望从sparksql访问hbase并将其作为Dataframe读取。下面是代码。
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import com.google.protobuf.ServiceException;
public class hbasesql {
public static void main(String args[])
{
SparkConf conf = new SparkConf().setAppName("HbaseStreamTest").setMaster("yarn-client");
conf.set("spark.executor.extraClassPath", "/etc/hbase/conf/hbase-site.xml");
conf.set("hbase.zookeeper.quorum", "=servername:2181");
conf.set("hbase.zookeeper.property.clientPort", "2181");
Configuration hconf = HBaseConfiguration.create();
hconf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));
hconf.addResource(new Path("/etc/hbase/conf/core-site.xml"));
final JavaSparkContext context = new JavaSparkContext(conf);
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(context);
try {
HBaseAdmin.checkHBaseAvailable(hconf);
System.out.println("HBase is running"); //This works
} catch (ServiceException e) {
System.out.println("HBase is not running");
e.printStackTrace();
} catch (MasterNotRunningException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
String sqlMapping = "KEY_FIELD STRING :key" + ", sql_firstname STRING c1:FIRSTNAME" + ","
+ "sql_lastname STRING c1:LASTNAME" ;
HashMap<String, String> colMap = new HashMap<String, String>();
colMap.put("hbase.columns.mapping", sqlMapping);
colMap.put("hbase.table", "testtable");
// DataFrame dfJail =
DataFrame df = sqlContext.read().format("org.apache.hadoop.hbase.spark").options(colMap).load();
//DataFrame df = sqlContext.load("org.apache.hadoop.hbase.spark", colMap);
// This is useful when issuing SQL text queries directly against the
// sqlContext object.
df.registerTempTable("temp_emp");
DataFrame result = sqlContext.sql("SELECT count(*) from temp_emp");
System.out.println("df " + df);
System.out.println("result " + result);
df.show();
}
}
但在运行相同的程序时,我得到了以下异常。
17/02/22 00:44:28 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
17/02/22 00:44:28 INFO storage.BlockManagerMasterEndpoint: Registering block manager servername:32831 with 2.1 GB RAM, BlockManagerId(2, servername, 32831)
Exception in thread "main" java.lang.NullPointerException
at org.apache.hadoop.hbase.spark.HBaseRelation.<init>(DefaultSource.scala:175)
at org.apache.hadoop.hbase.spark.DefaultSource.createRelation(DefaultSource.scala:78)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)
at hbasesql.main(hbasesql.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/02/22 00:44:28 INFO spark.SparkContext: Invoking stop() from shutdown hook
我猜sparkjavacontext中的sqlcontext无法将hbase-site.xml配置加载到其中。还是有其他问题?
非常感谢您的帮助。
提前感谢:)
暂无答案!
目前还没有任何答案,快来回答吧!