spring集成|当通过sftp入站适配器连接时,看到线程长时间处于可运行状态

zmeyuzjn  于 2021-07-24  发布在  Java
关注(0)|答案(1)|浏览(318)

我们使用spring集成动态sftp流来接收sftp文件。java配置流如下所示

from(Sftp.inboundAdapter(cachingSessionFactory, (a, b) -> Long.valueOf(a.lastModified())
            .compareTo(b.lastModified()))//
            .preserveTimestamp(true)//
            .remoteDirectory(job.getRemoteDirectory())//
            .deleteRemoteFiles(job.getDeleteRemoteFiles())//
            .filter(this.compositeRemoteFilter(job))//
            .autoCreateLocalDirectory(true)//
            .preserveTimestamp(true)//
            .maxFetchSize(maxMessagesPerPoll)
            .localFilter(new LocalFileFilter(job))//
            .localDirectory(localDirectory)),
            e -> e.id("testComponent")
                  .autoStartup(false)//
                  .poller(Pollers.cron(job.getPollingFreq(), job.timeZone())//
                        .maxMessagesPerPoll(maxMessagesPerPoll)
                        .receiveTimeout(1000L)    
                        .handle(UploadHandler)

缓存会话工厂是我们通过使用委托动态获得的。它的大部分工作正常,但有时在运行了几天之后,我们观察到一些线程卡在runnable中。我们的假设是,如果jsch会话以任何方式被卡住,它最终应该会退出,因为我们在会话工厂级别和轮询器级别都有超时。
线程的转储如下所示

java.io.FileInputStream.readBytes(Native Method)java.io.FileInputStream.read(FileInputStream.java:255)
java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
java.io.BufferedInputStream.read(BufferedInputStream.java:265)
com.jcraft.jsch.IO.getByte(IO.java:73)
com.jcraft.jsch.Session.connect(Session.java:263)
com.jcraft.jsch.Session.connect(Session.java:183)
org.springframework.integration.sftp.session.SftpSession.connect(SftpSession.java:268)
org.springframework.integration.sftp.session.DefaultSftpSessionFactory.getSession(DefaultSftpSessionFactory.java:390)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:44)
custom.adapters.session.LogEnabledSftpSessionFactory.getSession(LogEnabledSftpSessionFactory.java:15)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:84)
org.springframework.integration.file.remote.session.CachingSessionFactory$1.createForPool(CachingSessionFactory.java:81)
org.springframework.integration.util.SimplePool.doGetItem(SimplePool.java:195)
org.springframework.integration.util.SimplePool.getItem(SimplePool.java:176)
org.springframework.integration.file.remote.session.CachingSessionFactory.getSession(CachingSessionFactory.java:135)
custom.integration.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:80)
custom.DelegatingLocatorBasedSessionFactory.getSession(DelegatingLocatorBasedSessionFactory.java:67)
org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:308)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:258)
org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:64)
org.springframework.integration.endpoint.AbstractFetchLimitingMessageSource.doReceive(AbstractFetchLimitingMessageSource.java:45)
org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:160)org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:250)
org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:360)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1934/1648215776.call(Unknown Source)
org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$2062/2127922639.run(Unknown Source)
org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
org.springframework.integration.util.ErrorHandlingTaskExecutor$$Lambda$2063/1949167295.run(Unknown Source)
org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
org.springframework.integration.endpoint.AbstractPollingEndpoint$$Lambda$1935/1382748208.run(Unknown Source)org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:67)
org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

请帮助,如果我们在这里遗漏了什么,或者如果有一些配置,我们可以做的si方面,以解决这个问题。si版本5.1.13
另一个线程的堆转储跟踪

"Name","Retained Size","Shallow Size","Level"
"java.lang.Thread [Thread, Stack Local] ""my-taskScheduler-42"" tid=348 [RUNNABLE]","54768","120","1"
"contextClassLoader  org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","2"
"<local variable>  com.jcraft.jsch.Session [Stack Local]","21232","256","2"
"threadLocals  java.lang.ThreadLocal$ThreadLocalMap","15896","24","2"
"<local variable>  java.lang.UNIXProcess$ProcessPipeInputStream [Monitor Used, Stack Local]","8264","40","2"
"<local variable>  org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource [Stack Local]","4784","96","2"
"<local variable>  org.springframework.integration.endpoint.SourcePollingChannelAdapter [Stack Local]","2608","176","2"
"<local variable>  java.util.concurrent.ScheduledThreadPoolExecutor [Stack Local]","2392","80","2"
"<local variable>  org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer [Stack Local]","2168","64","2"
"<local variable>  org.springframework.integration.file.remote.RemoteFileTemplate [Stack Local]","824","64","2"
"<local variable>  org.springframework.integration.util.SimplePool [Stack Local]","744","56","2"
"group  java.lang.ThreadGroup","656","48","2"
"<local variable>  custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","2"
"<local variable>  org.springframework.scheduling.concurrent.ReschedulingRunnable [Stack Local]","232","48","2"
"inheritableThreadLocals  java.lang.ThreadLocal$ThreadLocalMap","104","24","2"
"inheritedAccessControlContext  java.security.AccessControlContext","88","40","2"
"name  java.lang.String ""my-taskScheduler-42""","80","24","2"
"<local variable>  java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask [Stack Local]","72","72","2"
"<local variable>  org.springframework.integration.sftp.session.SftpSession [Stack Local]","56","32","2"
"<local variable>  java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"target  java.util.concurrent.ThreadPoolExecutor$Worker [Stack Local]","48","48","2"
"<local variable>  org.springframework.integration.util.ErrorHandlingTaskExecutor [Stack Local]","40","24","2"
"<local variable>  org.springframework.integration.sftp.session.JSchSessionWrapper [Stack Local]","40","24","2"
"<local variable>  java.io.FileDescriptor [JNI Local]","32","32","2"
"<local variable>  org.springframework.integration.file.remote.session.CachingSessionFactory [Stack Local]","32","32","2"
"pool  org.springframework.integration.util.SimplePool [Stack Local]","744","56","3"
"sessionFactory  custom.adapters.session.LogEnabledSftpSessionFactory [Stack Local]","512","120","3"
"jsch  com.jcraft.jsch.JSch","736","32","4"
"proxy  custom.adapters.session.SftpProxyCommand","328","32","4"
"sessionConfig  java.util.Properties size = 2","176","48","4"
"sharedSessionLock  java.util.concurrent.locks.ReentrantReadWriteLock","120","24","4"
"host  java.lang.String ""sftp.server""","80","24","4"
"host  java.lang.String ""sftp.server""","80","24","4"
"<class>  custom.adapters.session.LogEnabledSftpSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","64","64","4"
"password  java.lang.String ""@#@#@#@#""","64","24","4"
"user  java.lang.String ""user""","64","24","4"
"user  java.lang.String ""user""","64","24","4"
"enableDaemonThread  java.lang.Boolean = false","16","16","4"
"serverAliveCountMax  java.lang.Integer = 4  0x00000004","16","16","4"
"serverAliveInterval  java.lang.Integer = 240,000  0x0003A980","16","16","4"
"timeout  java.lang.Integer = 120,000  0x0001D4C0","16","16","4"
"userInfoWrapper  org.springframework.integration.sftp.session.DefaultSftpSessionFactory$UserInfoWrapper","16","16","4"
"allowUnknownKeys = boolean false","","1","4"
"isSharedSession = boolean false","","1","4"
"port = int 22  0x00000016","","4","4"
"port = int 22  0x00000016","","4","4"
"<class>  org.springframework.integration.file.remote.session.CachingSessionFactory org.springframework.boot.loader.LaunchedURLClassLoader","96","72","3"
"<loader>  org.springframework.boot.loader.LaunchedURLClassLoader [Stack Local]","10324089","80","4"
"<protection domain>  java.security.ProtectionDomain","400","40","4"
"logger  org.apache.commons.logging.LogAdapter$Slf4jLocationAwareLog","24","24","4"
"isSharedSessionCapable = boolean true","","1","3"
"sharedSessionEpoch = long 0","","8","3"
"testSession = boolean true","","1","3"
"<local variable>  java.util.concurrent.Executors$RunnableAdapter [Stack Local]","24","24","2"
"<local variable>  java.util.Date [Stack Local] = 2021-01-19 20:30:17.000","24","24","2"
"blockerLock  java.lang.Object","16","16","2"
"daemon = boolean false","","1","2"
"eetop = long 28,082,176  0x0000000001AC8000","","8","2"
"nativeParkEventPointer = long 140,660,716,930,496  0x00007FEE201105C0","","8","2"
"priority = int 5  0x00000005","","4","2"
"single_step = boolean false","","1","2"
"stackSize = long 0","","8","2"
"stillborn = boolean false","","1","2"
"threadLocalRandomProbe = int -884,406,543  0xCB4906F1","","4","2"
"threadLocalRandomSecondarySeed = int 0","","4","2"
"threadLocalRandomSeed = long -7,128,783,728,802,150,278  0x9D1178F7F429C87A","","8","2"
"threadStatus = int 5  0x00000005","","4","2"
"tid = long 348  0x000000000000015C","","8","2"

隧道代理自定义代码

public class SftpProxyCommand implements Proxy
{

   String command; 
   Process p = null; 
   InputStream in = null; 
   OutputStream out = null; 
   public SftpProxyCommand(String appUser, String privateKeyLocation, String jumpHost)
   {
      this.command = on(" ").join("ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i",
            privateKeyLocation, "-l", appUser, jumpHost, "nc %h %p");
   } 
   public void connect(SocketFactory socket_factory, String host, int port, int timeout) throws Exception
   {

      String _command = command.replace("%h", host);
      _command = _command.replace("%p", new Integer(port).toString());
      p = Runtime.getRuntime().exec(_command);
      LOG.debug("Sftp Command : {}", _command);
      in = p.getInputStream();
      out = p.getOutputStream();
   }

   public Socket getSocket()
   {
      return null;
   } 
   public InputStream getInputStream()
   {
      return in;
   }

   public OutputStream getOutputStream()
   {

      return out;
   }

   public void close()
   {
      try
      {
         if (p != null)
         {
            p.getErrorStream().close();
            p.getOutputStream().close();
            p.getInputStream().close();
            p.destroy();
            p = null;
         }
      }
      catch (IOException e)
      {
         LOG.error("Issue in closing sftp command", e);
      }
   }

}

相关问题