如何在spark中使用hbase表

enyaitl3  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(406)

在hbase中有emp,department表。如何连接这些表和结果将再次存储在hbase中。如何使用spark scala实现这一点?有任何参考资料。

7y4bm7vi

7y4bm7vi1#

hbase是键值存储。一般来说,这些不支持 join 操作本身。
唯一的选择是读取两个hbase表(使用 scan )然后点燃Spark join .
看一看 java 举个例子。
下面是和link中相同的代码,以防万一。

try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)) {

  // The admin API lets us create, manage and delete tables
  Admin admin = connection.getAdmin();
  // [END bigtable_hw_connect]

  // [START bigtable_hw_create_table]
  // Create a table with a single column family
  HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
  descriptor.addFamily(new HColumnDescriptor(COLUMN_FAMILY_NAME));

  print("Create table " + descriptor.getNameAsString());
  admin.createTable(descriptor);
  // [END bigtable_hw_create_table]

  // [START bigtable_hw_write_rows]
  // Retrieve the table we just created so we can do some reads and writes
  Table table = connection.getTable(TableName.valueOf(TABLE_NAME));

  // Write some rows to the table
  print("Write some greetings to the table");
  for (int i = 0; i < GREETINGS.length; i++) {
    // Each row has a unique row key.
    //
    // Note: This example uses sequential numeric IDs for simplicity, but
    // this can result in poor performance in a production application.
    // Since rows are stored in sorted order by key, sequential keys can
    // result in poor distribution of operations across nodes.
    //
    // For more information about how to design a Bigtable schema for the
    // best performance, see the documentation:
    //
    //     https://cloud.google.com/bigtable/docs/schema-design
    String rowKey = "greeting" + i;

    // Put a single row into the table. We could also pass a list of Puts to write a batch.
    Put put = new Put(Bytes.toBytes(rowKey));
    put.addColumn(COLUMN_FAMILY_NAME, COLUMN_NAME, Bytes.toBytes(GREETINGS[i]));
    table.put(put);
  }
  // [END bigtable_hw_write_rows]

  // [START bigtable_hw_get_by_key]
  // Get the first greeting by row key
  String rowKey = "greeting0";
  Result getResult = table.get(new Get(Bytes.toBytes(rowKey)));
  String greeting = Bytes.toString(getResult.getValue(COLUMN_FAMILY_NAME, COLUMN_NAME));
  System.out.println("Get a single greeting by row key");
  System.out.printf("\t%s = %s\n", rowKey, greeting);
  // [END bigtable_hw_get_by_key]

  // [START bigtable_hw_scan_all]
  // Now scan across all rows.
  Scan scan = new Scan();

  print("Scan for all greetings:");
  ResultScanner scanner = table.getScanner(scan);
  for (Result row : scanner) {
    byte[] valueBytes = row.getValue(COLUMN_FAMILY_NAME, COLUMN_NAME);
    System.out.println('\t' + Bytes.toString(valueBytes));
  }
  // [END bigtable_hw_scan_all]

  // [START bigtable_hw_delete_table]
  // Clean up by disabling and then deleting the table
  print("Delete the table");
  admin.disableTable(table.getName());
  admin.deleteTable(table.getName());
  // [END bigtable_hw_delete_table]

} catch (IOException e) {
  System.err.println("Exception while running HelloWorld: " + e.getMessage());
  e.printStackTrace();
  System.exit(1);
}

System.exit(0);

 }

相关问题