hbase spark连接选项[java]

l7wslrjt  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(376)

我正在尝试从运行在yarn上的javaspark应用程序访问hbase,但我需要一些关于如何做到这一点的线索。我已经在网上搜索过了,但找不到确切的答案。它们在这里:
基本问题:如何与spark的hbase交互?我是否需要在每个worker上设置一个hbase连接(可能通过mappartition来保存一些连接),或者我可以在创建之后从驱动程序共享它?
hbase表对象是否可以由驱动程序示例化并发送给worker,以便对其执行put和get操作?
(与上一个相关)在spark中使用单put/get操作与hbase交互是一种好的做法吗?有别的选择吗?
谢谢你的回答

gxwragnw

gxwragnw1#

您可以使用两个选项:
在每个分区(mappartition或foreachpartition内)上设置hbase连接。如果希望每个执行器有一个连接,则必须在mappartition/foreachpartition中实现一种单例连接对象/池,并在该执行器执行的所有任务之间共享它(请注意,某些hbase客户端api不是线程安全的,最后完成的任务必须关闭连接,客户端缓冲区可能会快速增长)。
使用spark hbase connector(shc):您可以在非常原始的级别(put、get、scan、delete、mutuation等)与hbase交互,而不仅仅是通过结构化的Dataframe

mkh04yzy

mkh04yzy2#

问题1:您可以使用hortonworks spark hbase连接器(在3个可用的连接器中,它支持spark 2.x)
这将简化上面的q2和q3。您将能够以rdd的形式从hbase加载数据,然后按照您喜欢的方式对其进行操作(转换为dataframe并在此处进行操作,或者转换为tmp表im内存并在顶部写入sql查询等)
按照上面链接上的设置,您可以。。
要加载表:

def catalog = s"""{
        |"table":{"namespace":"default", "name":"table1"},
        |"rowkey":"key",
        |"columns":{
          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
          |"col1":{"cf":"cf1", "col":"col1", "type":"boolean"},
          |"col2":{"cf":"cf2", "col":"col2", "type":"double"},
          |"col3":{"cf":"cf3", "col":"col3", "type":"float"},
          |"col4":{"cf":"cf4", "col":"col4", "type":"int"},
          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},
          |"col6":{"cf":"cf6", "col":"col6", "type":"smallint"},
          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},
          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}
        |}
      |}""".stripMargin

  sqlContext
  .read
  .options(Map(HBaseTableCatalog.tableCatalog->catalog))
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()
}

要编写表格:

sc.parallelize(data).toDF.write.options(
  Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> "5"))
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .save()

相关问题