我的应用程序中有一个方法,它从hbase访问数据。它使用scan方法来查询hbase。我想编写单元测试用例来测试这个函数。所以我想模拟hbase调用。怎么做?我用mockit来嘲弄。
kkih6yb81#
在java中,您可以使用 HBaseTestingUtility 这样地:
HBaseTestingUtility
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false); TEST_UTIL.startMiniCluster(); } @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); EnvironmentEdgeManager.reset(); }
此外,您可能需要thrift server来使用某些客户端库:
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); ThriftServer thriftServer; Thread thriftServerThread; @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean("hbase.table.sanity.checks", false); TEST_UTIL.startMiniCluster(); List<String> args = new ArrayList<>(); port = HBaseTestingUtility.randomFreePort(); args.add("-" + ThriftServer.PORT_OPTION); args.add(String.valueOf(port)); args.add("-infoport"); int infoPort = HBaseTestingUtility.randomFreePort(); args.add(String.valueOf(infoPort)); args.add("start"); thriftServer = new ThriftServer(TEST_UTIL.getConfiguration()); thriftServerThread = new Thread(new Runnable() { @Override public void run() { thriftServer.doMain(args.toArray(new String[args.size()])); } }); thriftServerThread.setDaemon(true) thriftServerThread.start(); } @AfterClass public static void tearDownAfterClass() throws Exception { TEST_UTIL.shutdownMiniCluster(); EnvironmentEdgeManager.reset(); }
通过py4j使用pyspark:
def setUp(self): super(StreamingTest, self).setUp() # --- hbase configuration --- hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ .loadClass('org.apache.hadoop.hbase.HBaseTestingUtility') self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance() self._hbaseTestingUtility.startMiniCluster() def tearDown(self): if self._hbaseTestingUtility is not None: self._hbaseTestingUtility.shutdownMiniCluster()
如果需要使用thrift服务器(例如 happybase 客户端库):
happybase
def setUp(self): super(StreamingTest, self).setUp() # --- hbase configuration --- hbase_testing_utility_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ .loadClass('org.apache.hadoop.hbase.HBaseTestingUtility') self._hbaseTestingUtility = hbase_testing_utility_clz.newInstance() self._hbaseTestingUtility.getConfiguration().setBoolean("hbase.table.sanity.checks", False) # for thrift self._hbaseTestingUtility.startMiniCluster() # --- thrift server configuration --- thrift_server_clz = self.sparkStreamingContext._jvm.java.lang.Thread.currentThread().getContextClassLoader() \ .loadClass('org.apache.hadoop.hbase.thrift.ThriftServer') # make thrift server instance cArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1) cArgs[0] = self._hbaseTestingUtility.getConfiguration().getClass() iArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1) iArgs[0] = self._hbaseTestingUtility.getConfiguration() self._thriftServer = thrift_server_clz\ .getDeclaredConstructor(cArgs)\ .newInstance(iArgs) # prepare server start arguments tArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.String, 5) port = self._hbaseTestingUtility.randomFreePort() self.thrift_port = port tArgs[0] = "-port" tArgs[1] = str(port) tArgs[2] = "-infoport" info_port = self._hbaseTestingUtility.randomFreePort() tArgs[3] = str(info_port) tArgs[4] = "start" mArgs = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Class, 1) mArgs[0] = tArgs.getClass() method = thrift_server_clz.getDeclaredMethod('doMain', mArgs) method.setAccessible(True) args = self.sparkStreamingContext.sparkContext._gateway.new_array(self.sparkStreamingContext._jvm.java.lang.Object, 1) # start server in separate thread args[0] = tArgs self.thrift_server_thread = threading.Thread(target=method.invoke, args=[self._thriftServer, args]) self.thrift_server_thread.setDaemon(True) self.thrift_server_thread.start()
当然,hbase和thriftjar应该通过spark提交:
--jars path/to/jar1.jar,path/to/jar2.jar, --conf spark.driver.userClassPathFirst=true
w51jfk4q2#
如果您使用的是mockito,那么可以存根类,使它们返回您想要的内容。假设你有一门课叫 HBaseHelper 以及一个名为 getData() 在使用扫描仪从hbase检索数据的类中。现在假设您有另一个方法 useData() 在另一类中:
HBaseHelper
getData()
useData()
public String useData() { String data = hbaseHelper.getData(); // ... Do things with data return data; }
如果您使用的是mockito,那么可以在测试中有效地执行类似的操作,以返回虚拟“数据”,并测试使用此数据的方法:
import org.mockito.Mock; import org.mockito.Mockito.when; @Mock HBaseHelper hbaseHelper; @Test public void testFoo() { when(hbaseHelper.getData()).thenReturn("hello world"); assertThat(useData()).equals("hello world"); }
2条答案
按热度按时间kkih6yb81#
在java中,您可以使用
HBaseTestingUtility
这样地:此外,您可能需要thrift server来使用某些客户端库:
通过py4j使用pyspark:
如果需要使用thrift服务器(例如
happybase
客户端库):当然,hbase和thriftjar应该通过spark提交:
w51jfk4q2#
如果您使用的是mockito,那么可以存根类,使它们返回您想要的内容。
假设你有一门课叫
HBaseHelper
以及一个名为getData()
在使用扫描仪从hbase检索数据的类中。现在假设您有另一个方法useData()
在另一类中:如果您使用的是mockito,那么可以在测试中有效地执行类似的操作,以返回虚拟“数据”,并测试使用此数据的方法: