我的申请听着 kafka
主题并将数据转储到 cassandra
. 线程从 mongo
我也是。滞后 kafka
主题越来越多。我看到大多数线程在加载某些类时被阻塞。我把我的 thread_dump
在下面。
"KafkaConsumer-49" prio=10 tid=0x00007f1178fdd000 nid=0x78e0 waiting for monitor entry [0x00007f1155fb5000]
java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.ClassLoader.loadClass(ClassLoader.java:403)
- waiting to lock <0x00000006c0655b58> (a java.lang.Object)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at org.springframework.util.ClassUtils.forName(ClassUtils.java:258)
at org.springframework.data.convert.SimpleTypeInformationMapper.resolveTypeFrom(SimpleTypeInformationMapper.java:56)
at org.springframework.data.convert.DefaultTypeMapper.readType(DefaultTypeMapper.java:103)
at org.springframework.data.convert.DefaultTypeMapper.getDefaultedTypeToBeUsed(DefaultTypeMapper.java:144)
at org.springframework.data.convert.DefaultTypeMapper.readType(DefaultTypeMapper.java:121)
at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:186)
at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:176)
at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:172)
at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:75)
at org.springframework.data.mongodb.core.MongoTemplate$ReadDbObjectCallback.doWith(MongoTemplate.java:1840)
at org.springframework.data.mongodb.core.MongoTemplate.executeFindMultiInternal(MongoTemplate.java:1536)
at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1336)
at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1322)
at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:495)
at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:486)
at com.snapdeal.coms.timemachine.mao.TimeMachineMao.getVendorProductsForUploadId(TimeMachineMao.java:32)
at com.snapdeal.coms.timemachine.service.TimeMachineService.getVendorProductsForUploadIdAndSupc(TimeMachineService.java:35)
at com.snapdeal.coms.timemachine.event.SupcUploadIdStateUpdateEventHandler.handleEvent(SupcUploadIdStateUpdateEventHandler.java:40)
KafkaConsumer-48" prio=10 tid=0x00007f1178fdb000 nid=0x78df waiting for monitor entry [0x00007f11560b6000]
java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.ClassLoader.loadClass(ClassLoader.java:403)
- waiting to lock <0x00000006c0655b58> (a java.lang.Object)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
at org.springframework.util.ClassUtils.forName(ClassUtils.java:258)
at org.springframework.data.convert.SimpleTypeInformationMapper.resolveTypeFrom(SimpleTypeInformationMapper.java:56)
at org.springframework.data.convert.DefaultTypeMapper.readType(DefaultTypeMapper.java:103)
at org.springframework.data.convert.DefaultTypeMapper.getDefaultedTypeToBeUsed(DefaultTypeMapper.java:144)
at org.springframework.data.convert.DefaultTypeMapper.readType(DefaultTypeMapper.java:121)
at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:186)
at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:176)
at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:172)
at org.springframework.data.mongodb.core.convert.MappingMongoConverter.read(MappingMongoConverter.java:75)
at org.springframework.data.mongodb.core.MongoTemplate$ReadDbObjectCallback.doWith(MongoTemplate.java:1840)
at org.springframework.data.mongodb.core.MongoTemplate.executeFindMultiInternal(MongoTemplate.java:1536)
at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1336)
at org.springframework.data.mongodb.core.MongoTemplate.doFind(MongoTemplate.java:1322)
at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:495)
at org.springframework.data.mongodb.core.MongoTemplate.find(MongoTemplate.java:486)
at com.snapdeal.coms.timemachine.mao.TimeMachineMao.getVendorProductsForUploadId(TimeMachineMao.java:32)
at com.snapdeal.coms.timemachine.service.TimeMachineService.getVendorProductsForUploadIdAndSupc(TimeMachineService.java:35)
at com.snapdeal.coms.timemachine.event.SupcUploadIdStateUpdateEventHandler.handleEvent(SupcUploadIdStateUpdateEventHandler.java:40)
at com.snapdeal.coms.timemachine.TimeMachine.onEvent(TimeMachine.java:109)
“kafkaconsumer-47”prio=10 tid=0x00007f1178fd9800 nid=0x78de等待监视器条目[0x00007f11561b7000]java.lang.thread.state:在java.lang.classloader.loadclass(classloader)处阻塞(在对象监视器上)。java:403)-正在等待在sun.misc.launcher$appclassloader.loadclass(launcher)锁定<0x00000006c0655b58>(java.lang.object)。java:308)在java.lang.classloader.loadclass(类加载器。java:356)在org.springframework.util.classutils.forname(classutils。java:258)位于org.springframework.data.convert.simpletypeinformationmapper.resolvetypefrom(simpletypeinformationmapper)。java:56)在org.springframework.data.convert.defaulttypemapper.readtype(defaulttypemapper。java:103)在org.springframework.data.convert.defaulttypemapper.getdefaultedtypetobeused(defaulttypemapper。java:144)在org.springframework.data.convert.defaulttypemapper.readtype(defaulttypemapper。java:121)在org.springframework.data.mongodb.core.convert.mappingmongoconverter.read(mappingmongoconverter。java:186)在org.springframework.data.mongodb.core.convert.mappingmongoconverter.read(mappingmongoconverter。java:176)在org.springframework.data.mongodb.core.convert.mappingmongoconverter.read(mappingmongoconverter。java:172)在org.springframework.data.mongodb.core.convert.mappingmongoconverter.read(mappingmongoconverter。java:75)在org.springframework.data.mongodb.core.mongotemplate$readdbobjectcallback.dowith(mongotemplate。java:1840)在org.springframework.data.mongodb.core.mongotemplate.executefindmultiinternal(mongotemplate。java:1536)在org.springframework.data.mongodb.core.mongotemplate.dofind(mongotemplate。java:1336)在org.springframework.data.mongodb.core.mongotemplate.dofind(mongotemplate。java:1322)在org.springframework.data.mongodb.core.mongotemplate.find(mongotemplate。java:495)在org.springframework.data.mongodb.core.mongotemplate.find(mongotemplate。java:486)
"KafkaConsumer-46" prio=10 tid=0x00007f1178fd8000 nid=0x78dd waiting for monitor entry [0x00007f11562b8000]
java.lang.thread.state:在java.lang.classloader.loadclass(classloader。java:403)-正在等待在sun.misc.launcher$appclassloader.loadclass(launcher)锁定<0x00000006c0655b58>(java.lang.object)。java:308)在java.lang.classloader.loadclass(classloader。java:356)在org.springframework.util.classutils.forname(classutils。java:258)位于org.springframework.data.convert.simpletypeinformationmapper.resolvetypefrom(simpletypeinformationmapper)。java:56)在org.springframework.data.convert.defaulttypemapper.readtype(defaulttypemapper。java:103)在org.springframework.data.convert.defaulttypemapper.getdefaultedtypetobeused(defaulttypemapper。java:144)在org.springframework.data.convert.defaulttypemapper.readtype(defaulttypemapper。java:121)在org.springframework.data.mongodb.core.convert.mappingmongoconverter.read(mappingmongoconverter。java:186)在org.springframework.data.mongodb.core.convert.mappingmongoconverter.read(mappingmongoconverter。java:176)在org.springframework.data.mongodb.core.convert.mappingmongoconverter.read(mappingmongoconverter。java:172)在org.springframework.data.mongodb.core.convert.mappingmongoconverter.read(mappingmongoconverter。java:75)在org.springframework.data.mongodb.core.mongotemplate$readdbobjectcallback.dowith(mongotemplate。java:1840)在org.springframework.data.mongodb.core.mongotemplate.executefindmultiinternal(mongotemplate。java:1536)在org.springframework.data.mongodb.core.mongotemplate.dofind(mongotemplate。java:1336)在org.springframework.data.mongodb.core.mongotemplate.dofind(mongotemplate。java:1322)在org.springframework.data.mongodb.core.mongotemplate.find(mongotemplate。java:495)在org.springframework.data.mongodb.core.mongotemplate.find(mongotemplate。java:486)在com.snapdeal.com.timemachine.mao.timemachinemao.getvendorproductsforuploadid(timemachinemao。java:32)在com.snapdeal.coms.timemachine.service.timemachineservice.getvendorproductsforouploaddandsupc(timemachineservice。java:35)在com.snapdeal.com.timemachine.event.supcuploaddstateUpdateEventHandler.handleevent(supcuploaddstateUpdateEventHandler)。java:40)
我不知道为什么所有的线程都被阻塞了。我认为类只加载一次,以后不需要采取任何锁。
2条答案
按热度按时间qxsslcnc1#
你试过用这个吗
ConsumerOffsetChecker
看看你的消费者是否还活着?您可以从内部尝试以下命令$KAFKA_ROOT_DIR/
文件夹以下是他们常见问题页面上的一些注解
如果消费者补偿在一段时间后没有移动,那么消费者很可能已经停止。如果使用者偏移正在移动,但使用者滞后(日志末尾和使用者偏移之间的差)正在增加,则使用者比生产者慢。如果使用者速度慢,典型的解决方案是增加使用者中的并行度。这可能需要增加主题的分区数。
上面的faq页面也解释了你的消费者被阻止的可能原因,也许值得一看。
cbwuti442#
问题是从mongo获取数据。有大量的数据和分页没有实现,没有对特定的请求套接字超时,因此线程被阻止。