我正在尝试使用hortonworks spark on hbase连接器从spark读取kerberized hbase示例中的数据。我的集群配置基本上是这样的:我将spark作业从客户机提交到远程spark独立集群,而该作业正试图从单独的hbase集群读取数据。
如果我直接在我的客户机上运行master=local[*]的spark来绕过独立集群,我可以访问远程hbase集群,只要我首先从客户机启动它就没有问题。但是,当我将我的主服务器设置为远程集群,并且所有其他配置都相同时,我在 org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43)
(下面是完整的堆栈跟踪)
有没有人完成了类似的架构,也许可以提供一些见解?尽管日志中没有提到任何身份验证问题,但我有一种预感,在从非kerberized spark集群访问hbase时,我可能遇到了身份验证问题。
完整堆栈跟踪:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0: java.lang.NullPointerException
at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
at org.apache.spark.sql.execution.datasources.hbase.TableResource.init(HBaseResources.scala:125)
at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.liftedTree1$1(HBaseResources.scala:57)
at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.acquire(HBaseResources.scala:54)
at org.apache.spark.sql.execution.datasources.hbase.TableResource.acquire(HBaseResources.scala:120)
at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.releaseOnException(HBaseResources.scala:74)
at org.apache.spark.sql.execution.datasources.hbase.TableResource.releaseOnException(HBaseResources.scala:120)
at org.apache.spark.sql.execution.datasources.hbase.TableResource.getScanner(HBaseResources.scala:144)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$7.apply(HBaseTableScan.scala:267)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$7.apply(HBaseTableScan.scala:266)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hbase.security.UserProvider.instantiate(UserProvider.java:43)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:214)
at org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
at org.apache.spark.sql.execution.datasources.hbase.TableResource.init(HBaseResources.scala:125)
at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.liftedTree1$1(HBaseResources.scala:57)
at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.acquire(HBaseResources.scala:54)
at org.apache.spark.sql.execution.datasources.hbase.TableResource.acquire(HBaseResources.scala:120)
at org.apache.spark.sql.execution.datasources.hbase.ReferencedResource$class.releaseOnException(HBaseResources.scala:74)
at org.apache.spark.sql.execution.datasources.hbase.TableResource.releaseOnException(HBaseResources.scala:120)
at org.apache.spark.sql.execution.datasources.hbase.TableResource.getScanner(HBaseResources.scala:144)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$7.apply(HBaseTableScan.scala:267)
at org.apache.spark.sql.execution.datasources.hbase.HBaseTableScanRDD$$anonfun$7.apply(HBaseTableScan.scala:266)
at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56)
at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
2条答案
按热度按时间jq6vz3qz1#
出现配置问题,hbase.client.userprovider.class配置不可用。您需要确保hbase libs和conf文件位于spark执行器的路径上。
zvokhttg2#
我偶然发现了这个症状(但根本原因可能不一样),并找到了一个非常肮脏的解决办法,你可能不想尝试。
$$上下文$$ cloudera发行版,hbase 1.2.0-cdh5.7.0
$$发行#1$$ apache/hortonworks发行版中的一些代码清理还没有移植到cloudera发行版,例如。
java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Scan.setCaching(I)Lorg/apache/hadoop/hbase/client/Scan;
$$解决方法#1$$从horton repo下载hbase客户机jar,特别是“client”、“common”和“protocol”,适用于版本1.1.2(这是spark hbase模块的pom中显示的依赖关系)。
添加这些jar(和目录
/etc/hbase/conf/
)至spark.driver.extraClassPath
以及spark hbase jar。通过命令行选项将这些jar发送给执行者
--jars
以及spark hbase jar(别忘了目录
/etc/hbase/conf/
在spark.executor.extraClassPath
如果conf出现在所有的yarn节点上;否则,请找到一种方法将xml传送到其容器类路径中的目录中)$$发行#2$$ 不知何故,在yarn模式下,spark执行器不能正确生成传递给方法的hbase配置
org.apache.hadoop.hbase.security.UserProvider.instantiate(Configuration)
以及org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(Configuration, boolean, ExecutorService, User)
,因此java.lang.NullPointerException
$$解决方法#2$$从github分支1.1下载这两个类的hbase源代码
修补代码以确保
conf
参数为null时,将用调用org.apache.hadoop.hbase.HBaseConfiguration.create()
编译这两个类,并替换原来的.class
在适当的jar中的可执行文件与您的补丁版本修补spark-hbase插件当然更有意义(参见那篇文章中ray3888的评论),但是scala让我恶心,所以我坚持使用纯“旧java”。