java—如何通过执行内部连接并将其引入配置单元来从hbase表中检索数据

j8ag8udp  于 2021-06-10  发布在  Hbase
关注(0)|答案(1)|浏览(287)

我有两个hbase表' hbaseTable ', ' hbaseTable1 '和配置单元表' hiveTable '我的查询看起来像:

'insert overwrite hiveTable select col1, h2.col2, col3 from hbaseTable h1,hbaseTable2 h2 where h1.col=h2.col2';

我需要在hbase中进行内部连接并将数据带到配置单元。我们使用的是hive和java,它的性能非常差。所以计划用spark改变方法。i、 e,spark with java如何使用spark从java代码连接到hbase。
现在,我的spark代码应该在hbase中执行一个连接,并通过上面的查询将数据引入hive。
请提供样本代码。

az31mfrm

az31mfrm1#

如果您使用spark加载hbase数据,那么为什么要在配置单元中加载它呢?您可以使用类似于hive的sparksql,从而使用sql。您完全可以不使用配置单元来查询数据。例如:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import scala.Tuple2;
import java.util.Arrays;

public class SparkHbaseHive {
    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();
        conf.set(TableInputFormat.INPUT_TABLE, "test");
        JavaSparkContext jsc = new JavaSparkContext(new SparkConf().setAppName("Spark-Hbase").setMaster("local[3]"));
        JavaPairRDD<ImmutableBytesWritable, Result> source = jsc
                .newAPIHadoopRDD(conf, TableInputFormat.class,
                        ImmutableBytesWritable.class, Result.class);
        SQLContext sqlContext = new SQLContext(jsc);
        JavaRDD<Table1Bean> rowJavaRDD = 

source.map((Function<Tuple2<ImmutableBytesWritable, Result>, Table1Bean>) object -> {
            Table1Bean table1Bean = new Table1Bean();
            table1Bean.setRowKey(Bytes.toString(object._1().get()));

table1Bean.setColumn1(Bytes.toString(object._2().getValue(Bytes.toBytes("colfam1"), Bytes.toBytes("col1"))));
            return table1Bean;
    });
        DataFrame df = sqlContext.createDataFrame(rowJavaRDD, Table1Bean.class);

        //similarly create df2
        //use df.join() and then register as joinedtable or register two tables and join
        //execute sql queries

        //Example of sql query on df
        df.registerTempTable("table1");
        Arrays.stream(sqlContext.sql("select * from table1").collect()).forEach(row -> System.out.println(row.getString(0) + "," + row.getString(1)));

    }
}
public class Table1Bean {
    private String rowKey;
    private String column1;

    public String getRowKey() {
        return rowKey;
    }

    public void setRowKey(String rowKey) {
        this.rowKey = rowKey;
    }

    public String getColumn1() {
        return column1;
    }

    public void setColumn1(String column1) {
        this.column1 = column1;
    }
}

如果出于某些原因需要使用配置单元,请使用hivecontext从配置单元读取数据,并使用saveastable持久化数据。如果有疑问,请告诉我。

相关问题