如何在azuredatabricks中写入hbase?

lnvxswe2  于 2021-06-09  发布在  Hbase
关注(0)|答案(0)|浏览(263)

我正在尝试使用“kafka spark hbase”构建一个lambda架构。我使用的是azure云,组件在以下平台上1。Kafka(0.10)-HD2。Spark(2.4.3)-数据Spark3。hbase(1.2)-hdinsights
所有这3个组件都在同一个v-net下,因此在连接性方面没有问题。我正在使用spark结构化流媒体,并且成功地连接到kafka作为源。
现在,由于spark不提供连接到hbase的本机支持,因此我使用“spark hortonworks connector”将数据写入hbase,并在spark 2.4以后提供的“foreachbatch”api中实现了将批写入hbase的代码。
代码如下:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.Dataset
import org.apache.spark.SparkConf
import org.apache.spark.sql.DataFrame

//code dependencies
import com.JEM.conf.HbaseTableConf
import com.JEM.constant.HbaseConstant

//'Spark hortonworks connector' dependency
import org.apache.spark.sql.execution.datasources.hbase._

    //---------Variables--------------//
    val kafkaBroker = "valid kafka broker"
    val topic = "valid kafka topic"
    val kafkaCheckpointLocation = "/checkpointDir"

 //---------code--------------//
    import spark.sqlContext.implicits._

    val kafkaIpStream = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", kafkaBroker)
      .option("subscribe", topic)
      .option("checkpointLocation", kafkaCheckpointLocation)
      .option("startingOffsets", "earliest")
      .load()

    val streamToBeSentToHbase = kafkaIpStream.selectExpr("cast (key as String)", "cast (value as String)")
      .withColumn("ts", split($"key", "/")(1))
      .selectExpr("key as rowkey", "ts", "value as val")
      .writeStream
      .option("failOnDataLoss", false)
      .outputMode(OutputMode.Update())
      .trigger(Trigger.ProcessingTime("30 seconds"))
      .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
        batchDF
          .write
          .options(Map(HBaseTableCatalog.tableCatalog -> HbaseTableConf.getRawtableConf(HbaseConstant.hbaseRawTable), HBaseTableCatalog.newTable -> "5"))
          .format("org.apache.spark.sql.execution.datasources.hbase").save
      }.start()

从日志中我可以看到代码能够成功地获取数据,但是当它试图写入hbase时,我得到了以下异常。

19/10/09 12:42:48 ERROR MicroBatchExecution: Query [id = 1a54283d-ab8a-4bf4-af65-63becc166328, runId = 670f90de-8ca5-41d7-91a9-e8d36dfeef66] terminated with error
java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/NamespaceNotFoundException
    at org.apache.spark.sql.execution.datasources.hbase.DefaultSource.createRelation(HBaseRelation.scala:59)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:72)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:88)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:146)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:134)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$5.apply(SparkPlan.scala:187)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:183)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:710)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:710)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:306)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:292)
    at com.JEM.Importer.StreamingHbaseImporter$.$anonfun$main$1(StreamingHbaseImporter.scala:57)
    at com.JEM.Importer.StreamingHbaseImporter$.$anonfun$main$1$adapted(StreamingHbaseImporter.scala:53)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:568)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:566)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:565)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:207)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:296)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.NamespaceNotFoundException
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 44 more
19/10/09 12:42:48 INFO SparkContext: Invoking stop() from shutdown hook
19/10/09 12:42:48 INFO AbstractConnector: Stopped Spark@42f85fa4{HTTP/1.1,[http/1.1]}{172.20.170.72:47611}
19/10/09 12:42:48 INFO SparkUI: Stopped Spark web UI at http://172.20.170.72:47611
19/10/09 12:42:48 INFO StandaloneSchedulerBackend: Shutting down all executors
19/10/09 12:42:48 INFO CoarseGrainedSchedulerBackend$DriverEndpoint: Asking each executor to shut down
19/10/09 12:42:48 INFO SQLAppStatusListener: Execution ID: 1 Total Executor Run Time: 0
19/10/09 12:42:49 INFO SQLAppStatusListener: Execution ID: 0 Total Executor Run Time: 0
19/10/09 12:42:49 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/10/09 12:42:49 INFO MemoryStore: MemoryStore cleared
19/10/09 12:42:49 INFO BlockManager: BlockManager stopped
19/10/09 12:42:49 INFO BlockManagerMaster: BlockManagerMaster stopped
19/10/09 12:42:49 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/10/09 12:42:49 INFO SparkContext: Successfully stopped SparkContext
19/10/09 12:42:49 INFO ShutdownHookManager: Shutdown hook called
19/10/09 12:42:49 INFO ShutdownHookManager: Deleting directory /local_disk0/tmp/temporaryReader-79d0f9b8-c380-4141-9ac2-46c257c6c854
19/10/09 12:42:49 INFO ShutdownHookManager: Deleting directory /local_disk0/tmp/temporary-d00d2f73-96e3-4a18-9d5c-a9ff76a871bb
19/10/09 12:42:49 INFO ShutdownHookManager: Deleting directory /local_disk0/tmp/spark-b92c0171-286b-4863-9fac-16f4ac379da8
19/10/09 12:42:49 INFO ShutdownHookManager: Deleting directory /local_disk0/tmp/spark-ef92ca6b-2e7d-4917-b407-4426ad088cee
19/10/09 12:42:49 INFO ShutdownHookManager: Deleting directory /local_disk0/spark-9b138379-fa3a-49db-95dc-436cd7040a95
19/10/09 12:42:49 INFO ShutdownHookManager: Deleting directory /local_disk0/tmp/spark-2666c4ff-30a2-4161-868e-137af5fa3787

我都试过了:
在databricks中运行spark作业有3种方法:
notebook:我已经在集群上安装了shc jar作为库,并将“hbase site.xml”放入我的job jar中,然后也将其安装到集群上,我没有将主类代码粘贴到notebook上,当我运行它时,我能够从shc加载依赖项,但出现上述错误。
jar:这与notebook非常相似,不同的是,我将main类和jar赋给作业并运行它,而不是notebook。给我同样的错误。
spark submit:我创建了一个包含所有依赖项(包括shc)的uberjar,我将hbase站点文件上传到dbfs path,并在上面的submit命令中提供,如下所示。

[
"--class","com.JEM.Importer.StreamingHbaseImporter","dbfs:/FileStore/JarPath/jarfile.jar",
"--packages","com.hortonworks:shc-core:1.1.1-2.1-s_2.11",
"--repositories","http://repo.hortonworks.com/content/groups/public/",
"--files","dbfs:/PathToSiteFile/hbase_site.xml"
]

但我还是犯了同样的错误。有人能帮忙吗?谢谢

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题