hbase应用程序:通过模拟hbase进行单元测试

pod7payv  于 2021-06-09  发布在  Hbase
关注(0)|答案(2)|浏览(649)

我的应用程序中有一个方法,它从hbase访问数据。它使用scan方法来查询hbase。我想编写单元测试用例来测试这个函数。所以我想模拟hbase调用。怎么做?我用mockit来嘲弄。

kkih6yb8

kkih6yb81#

在java中,您可以使用 HBaseTestingUtility 这样地:

private static final HBaseTestingUtility TEST_UTIL =
  new HBaseTestingUtility();

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}

此外,您可能需要thrift server来使用某些客户端库:

private static final HBaseTestingUtility TEST_UTIL =
  new HBaseTestingUtility();

ThriftServer thriftServer;
Thread thriftServerThread;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false);
TEST_UTIL.startMiniCluster();

List<String> args = new ArrayList<>();
port = HBaseTestingUtility.randomFreePort();
args.add("-" + ThriftServer.PORT_OPTION);
args.add(String.valueOf(port));
args.add("-infoport");
int infoPort = HBaseTestingUtility.randomFreePort();
args.add(String.valueOf(infoPort));
args.add("start");

thriftServer = new ThriftServer(TEST_UTIL.getConfiguration());

thriftServerThread = new Thread(new Runnable() {
  @Override
  public void run() {
    thriftServer.doMain(args.toArray(new String[args.size()]));
  }
});
thriftServerThread.setDaemon(true)
thriftServerThread.start();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
EnvironmentEdgeManager.reset();
}

通过py4j使用pyspark:

def setUp(self):
    super(StreamingTest, self).setUp()

    # --- hbase configuration ---
    hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
        .loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
    self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()

    self._hbaseTestingUtility.startMiniCluster()

def tearDown(self):
    if self._hbaseTestingUtility is not None:
        self._hbaseTestingUtility.shutdownMiniCluster()

如果需要使用thrift服务器(例如 happybase 客户端库):

def setUp(self):
    super(StreamingTest, self).setUp()

    # --- hbase configuration ---
    hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
        .loadClass('org.apache.hadoop.hbase.HBaseTestingUtility')
    self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance()
    self._hbaseTestingUtility.getConfiguration().setBoolean("hbase.table.sanity.checks", False)  # for thrift
    self._hbaseTestingUtility.startMiniCluster()

    # --- thrift server configuration ---
    thrift_server_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \
        .loadClass('org.apache.hadoop.hbase.thrift.ThriftServer')

    # make thrift server instance
    cArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
    cArgs[0] = self._hbaseTestingUtility.getConfiguration().getClass()
    iArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)
    iArgs[0] = self._hbaseTestingUtility.getConfiguration()

    self._thriftServer = thrift_server_clz\
        .getDeclaredConstructor(cArgs)\
        .newInstance(iArgs)

    # prepare server start arguments
    tArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.String, 5)
    port = self._hbaseTestingUtility.randomFreePort()
    self.thrift_port = port
    tArgs[0] = "-port"
    tArgs[1] = str(port)
    tArgs[2] = "-infoport"
    info_port = self._hbaseTestingUtility.randomFreePort()
    tArgs[3] = str(info_port)
    tArgs[4] = "start"

    mArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1)
    mArgs[0] = tArgs.getClass()
    method = thrift_server_clz.getDeclaredMethod('doMain', mArgs)
    method.setAccessible(True)

    args = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1)

    # start server in separate thread
    args[0] = tArgs
    self.thrift_server_thread = threading.Thread(target=method.invoke, args=[self._thriftServer, args])
    self.thrift_server_thread.setDaemon(True)
    self.thrift_server_thread.start()

当然,hbase和thriftjar应该通过spark提交:

--jars path/to/jar1.jar,path/to/jar2.jar, --conf spark.driver.userClassPathFirst=true
w51jfk4q

w51jfk4q2#

如果您使用的是mockito,那么可以存根类,使它们返回您想要的内容。
假设你有一门课叫 HBaseHelper 以及一个名为 getData() 在使用扫描仪从hbase检索数据的类中。现在假设您有另一个方法 useData() 在另一类中:

public String useData() {
  String data = hbaseHelper.getData();

  // ... Do things with data
  return data;
}

如果您使用的是mockito,那么可以在测试中有效地执行类似的操作,以返回虚拟“数据”,并测试使用此数据的方法:

import org.mockito.Mock;
import org.mockito.Mockito.when;

@Mock
HBaseHelper hbaseHelper;

@Test
public void testFoo() {
  when(hbaseHelper.getData()).thenReturn("hello world");

  assertThat(useData()).equals("hello world");
}

相关问题