我对hbase很陌生。我的应用程序使用kafka使用者(多线程),它们从kafka读取消息。每个使用者从kafka读取消息,从消息进行put,创建hbase表并在hbase中执行插入。
但伙计们,对我来说,有一个拦截器。这个流在一段时间后被挂起。我已经附加了相同的代码,看一看。
public void doBatchInsertionForDeviceTable(List<Put> totalRows, Integer hbaseTableSuffix) {
long start, end;
HTable tableForDeviceTable = null;
HBaseConfigurationCache hBaseConfigurationCache = null;
String tableName = null;
start = System.currentTimeMillis();
try {
hBaseConfigurationCache = CacheManager.getInstance().getCache(HBaseConfigurationCache.class);
tableName = DataHandlerConstant.HBASE_DEVICE_TABLE + String.valueOf(hbaseTableSuffix);
tableForDeviceTable = new HTable(hBaseConfigurationCache.getHBaseConfiguration(), tableName);
LOG.info("Attempting HBase Insertion in Table (" + tableName + ") with rows (" + totalRows.size() + ")");
try {
putCountForDeviceStream.addAndGet(totalRows.size());
LOG.info("Total put have been build till now is :("+putCountForDeviceStream.get()+") for device stream.");
tableForDeviceTable.put(totalRows);
LOG.info("Batch have been inserted successfully.");
} catch (Exception ex) {
LOG.error("Problem while inserting rows (" + totalRows.size() + ") ." + ex.getLocalizedMessage());
ex.printStackTrace();
} finally {
tableForDeviceTable.flushCommits();
}
} catch (Exception ex) {
LOG.error("Sorry,We are unable to perform insertion due to (" + ex.getMessage() + ")");
ex.printStackTrace();
}
end = System.currentTimeMillis();
LOG.info("Consume Time HBase Insertion in Table (" + tableName + ") with rows (" + totalRows.size() + ") is (" + (end - start) + ")");
}
线程转储也如下所示:
java.lang.thread.state:waiting(parking)at sun.misc.unsafe.park(native method)-parking to wait for<0x00000005adbb0720>(java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject)at java.util.concurrent.locks.locksupport.park(locksupport)。java:186)在java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject.await(abstractqueuedsynchronizer。java:2043)在java.util.concurrent.linkedblockingqueue.take(linkedblockingqueue。java:442)在org.i0itec.zkclient.zkeventthread.run(zkeventthread。java:67)
锁定的可拥有同步器:-无
“metrics-meter-tick-thread-2”守护进程prio=10 tid=0x00007fed950c1800 nid=0x3006 runnable[0x00007fedb4451000]java.lang.thread.state:timed_waiting(parking)at sun.misc.unsafe.park(native method)-parking to waiting for<0x00000005ad4b4608>(java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject)atjava.util.concurrent.locks.locksupport.parknos(locksupport。java:226)在java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject.awaitnanos(abstractqueuedsynchronizer)。java:2082)在java.util.concurrent.scheduledthreadpoolexecutor$delayedworkqueue.take(scheduledthreadpoolexecutor。java:1090)在java.util.concurrent.scheduledthreadpoolexecutor$delayedworkqueue.take(scheduledthreadpoolexecutor。java:807)位于java.util.concurrent.threadpoolexecutor.gettask(threadpoolexecutor。java:1068)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1130)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)在java.lang.thread.run(线程。java:744)
锁定的可拥有同步器:-无
“metrics-meter-tick-thread-1”守护进程prio=10 tid=0x00007fed950c8800 nid=0x3005等待条件[0x00007fedb4653000]
java.lang.thread.state:waiting(parking)at sun.misc.unsafe.park(native method)-parking to wait for<0x00000005ad4b4608>(java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject)at java.util.concurrent.locks.locksupport.park(locksupport)。java:186)在java.util.concurrent.locks.abstractqueuedsynchronizer$conditionobject.await(abstractqueuedsynchronizer。java:2043)在java.util.concurrent.scheduledthreadpoolexecutor$delayedworkqueue.take(scheduledthreadpoolexecutor。java:1085)在java.util.concurrent.scheduledthreadpoolexecutor$delayedworkqueue.take(scheduledthreadpoolexecutor。java:807)在java.util.concurrent.threadpoolexecutor.gettask(threadpoolexecutor。java:1068)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1130)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)在java.lang.thread.run(线程。java:744)
锁定的可拥有同步器:-无
“pool-42-thread-1”prio=10 tid=0x00007fed94b3c800 nid=0x2ffe in object.wait()[0x00007fedb4955000]java.lang.thread.state:org.apache.hadoop.hbase.client.asyncprocess.waitfornexttaskdone(asyncprocess)的java.lang.object.wait(本机方法)上的timed\u waiting(在对象监视器上)。java:853)-锁定<0x00000005af1d9880>(ajava.util.concurrent.atomiclong),位于org.apache.hadoop.hbase.client.asyncprocess.waitformaximumcurrenttasks(asyncprocess)。java:879)在org.apache.hadoop.hbase.client.asyncprocess.waituntildone(asyncprocess。java:892)在org.apache.hadoop.hbase.client.htable.backgroundflushcommits(htable。java:968)在org.apache.hadoop.hbase.client.htable.flushcommits(htable。java:1252)在com.snapdeal.datahandler.utils.hbase.hbaseutils.dobatchinsertionforclickstream(hbaseutils。java:290)位于com.snapdeal.datahandler.hbase.queue.consumer.hbaseworkerthread.performhbaseinsertion(hbaseworkerthread)。java:111)在com.snapdeal.datahandler.hbase.queue.consumer.hbaseworkerthread.run(hbaseworkerthread)。java:65)在java.util.concurrent.executors$runnableadapter.call(executors。java:471)在java.util.concurrent.futuretask.run(futuretask。java:262)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)在java.lang.thread.run(线程。java:744)
锁定的可拥有同步器:-<0x00000005af1d8260>(java.util.concurrent.threadpoolexecutor$worker)
“pool-41-thread-1”prio=10 tid=0x00007fed94b3a800 nid=0x2ffd in object.wait()[0x00007fedb4a56000]java.lang.thread.state:org.apache.hadoop.hbase.client.asyncprocess.waitfornexttaskdone(asyncprocess)的java.lang.object.wait(本机方法)上的timed\u waiting(在对象监视器上)。java:853)-锁定<0x00000005aedcf770>(ajava.util.concurrent.atomiclong),位于org.apache.hadoop.hbase.client.asyncprocess.waitformaximumcurrenttasks(asyncprocess)。java:879)在org.apache.hadoop.hbase.client.asyncprocess.waituntildone(asyncprocess。java:892)在org.apache.hadoop.hbase.client.htable.backgroundflushcommits(htable。java:968)在org.apache.hadoop.hbase.client.htable.flushcommits(htable。java:1252)在com.snapdeal.datahandler.utils.hbase.hbaseutils.dobatchinsertionforclickstream(hbaseutils。java:290)位于com.snapdeal.datahandler.hbase.queue.consumer.hbaseworkerthread.performhbaseinsertion(hbaseworkerthread)。java:111)在com.snapdeal.datahandler.hbase.queue.consumer.hbaseworkerthread.run(hbaseworkerthread)。java:65)在java.util.concurrent.executors$runnableadapter.call(executors。java:471)在java.util.concurrent.futuretask.run(futuretask。java:262)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)在java.lang.thread.run(线程。java:744)
锁定的可拥有同步器:-<0x00000005aedcf1d8>(java.util.concurrent.threadpoolexecutor$worker)
“pool-40-thread-1”prio=10 tid=0x00007fed94b38800 nid=0x2ffc in object.wait()[0x00007fedb4b57000]java.lang.thread.state:org.apache.hadoop.hbase.client.asyncprocess.waitfornexttaskdone(asyncprocess)的java.lang.object.wait(本机方法)上的timed\u waiting(在对象监视器上)。java:853)-锁定<0x00000005ad8c8c38>(ajava.util.concurrent.atomiclong),位于org.apache.hadoop.hbase.client.asyncprocess.waitformaximumcurrenttasks(asyncprocess)。java:879)在org.apache.hadoop.hbase.client.asyncprocess.waituntildone(asyncprocess。java:892)在org.apache.hadoop.hbase.client.htable.backgroundflushcommits(htable。java:968)在org.apache.hadoop.hbase.client.htable.flushcommits(htable。java:1252)在com.snapdeal.datahandler.utils.hbase.hbaseutils.dobatchinsertionforclickstream(hbaseutils。java:290)位于com.snapdeal.datahandler.hbase.queue.consumer.hbaseworkerthread.performhbaseinsertion(hbaseworkerthread)。java:111)在com.snapdeal.datahandler.hbase.queue.consumer.hbaseworkerthread.run(hbaseworkerthread)。java:65)在java.util.concurrent.executors$runnableadapter.call(executors。java:471)在java.util.concurrent.futuretask.run(futuretask。java:262)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)在java.lang.thread.run(线程。java:744)
锁定的可拥有同步器:-<0x00000005ad8c7618>(java.util.concurrent.threadpoolexecutor$worker)
“pool-39-thread-1”prio=10 tid=0x00007fed94b3600nid=0x2ffb in object.wait()[0x00007fedb4c58000]java.lang.thread.state:在org.apache.hadoop.hbase.client.asyncprocess.waitfornexttaskdone(asyncprocess)的java.lang.object.wait(本机方法)上的timed\u waiting(在对象监视器上)。java:853)-锁定<0x00000005ad8cf3b8>(ajava.util.concurrent.atomiclong),位于org.apache.hadoop.hbase.client.asyncprocess.waitformaximumcurrenttasks(asyncprocess)。java:879)在org.apache.hadoop.hbase.client.asyncprocess.waituntildone(asyncprocess。java:892)在org.apache.hadoop.hbase.client.htable.backgroundflushcommits(htable。java:968)在org.apache.hadoop.hbase.client.htable.flushcommits(htable。java:1252)在com.snapdeal.datahandler.utils.hbase.hbaseutils.dobatchinsertionforclickstream(hbaseutils。java:290)位于com.snapdeal.datahandler.hbase.queue.consumer.hbaseworkerthread.performhbaseinsertion(hbaseworkerthread)。java:111)在com.snapdeal.datahandler.hbase.queue.consumer.hbaseworkerthread.run(hbaseworkerthread)。java:65)在java.util.concurrent.executors$runnableadapter.call(executors。java:471)在java.util.concurrent.futuretask.run(futuretask。java:262)位于java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor。java:1145)在java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor。java:615)
暂无答案!
目前还没有任何答案,快来回答吧!