CachingConnectionFactory缓存Session的原理 关键类
JmsTemplate
SingleConnectionFactory
CachingConnectionFactory
SharedConnectionInvocationHandler
CachedSessionInvocationHandler
如何拿到缓存的Session 无论你使用的是SingleConnectionFactory还是CachingConnectionFactory作为ConnectionFactory. 都是调用SingleConnectionFactory的createConnection()来创建connection.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 SingleConnectionFactory.java @Override public Connection createConnection () throws JMSException { return getSharedConnectionProxy(getConnection()); } protected Connection getSharedConnectionProxy (Connection target) { List<Class<?>> classes = new ArrayList<>(3 ); classes.add(Connection.class); if (target instanceof QueueConnection) { classes.add(QueueConnection.class); } if (target instanceof TopicConnection) { classes.add(TopicConnection.class); } return (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), ClassUtils.toClassArray(classes), new SharedConnectionInvocationHandler()); }
所以Spring-Jms对该connection做了一个代理。当后面调用createSession等方法的时候,会进入SharedConnectionInvocationHandler。
当对Connection调用createSession的时候会进入SharedConnectionInvocationHandler。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 CachingConnectionFactory.java("SharedConnectionInvocationHandler" ) else if (method.getName().equals("createSession" ) || method.getName().equals("createQueueSession" ) || method.getName().equals("createTopicSession" )) { Integer mode = Session.AUTO_ACKNOWLEDGE; if (args != null ) { if (args.length == 1 ) { mode = (Integer) args[0 ]; } else if (args.length == 2 ) { boolean transacted = (Boolean) args[0 ]; Integer ackMode = (Integer) args[1 ]; mode = (transacted ? Session.SESSION_TRANSACTED : ackMode); } } Session session = getSession(getConnection(), mode); if (session != null ) { if (!method.getReturnType().isInstance(session)) { String msg = "JMS Session does not implement specific domain: " + session; try { session.close(); } catch (Throwable ex) { logger.trace("Failed to close newly obtained JMS Session" , ex); } throw new javax.jms.IllegalStateException(msg); } return session; } } try { return method.invoke(getConnection(), args); } catch (InvocationTargetException ex) { throw ex.getTargetException(); }
这里的关键便是getSession(...)方法,如果使用的是SingleConnectionFactory则会直接返回null。如果使用的是CachingConnectionFactory则会有下面的逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 CachingConnectionFactory.java @Override protected Session getSession (Connection con, Integer mode) throws JMSException { if (!this .active) { return null ; } LinkedList<Session> sessionList = this .cachedSessions.computeIfAbsent(mode, k -> new LinkedList<>()); Session session = null ; synchronized (sessionList) { if (!sessionList.isEmpty()) { session = sessionList.removeFirst(); } } if (session != null ) { if (logger.isTraceEnabled()) { logger.trace("Found cached JMS Session for mode " + mode + ": " + (session instanceof SessionProxy ? ((SessionProxy) session).getTargetSession() : session)); } } else { Session targetSession = createSession(con, mode); if (logger.isDebugEnabled()) { logger.debug("Registering cached JMS Session for mode " + mode + ": " + targetSession); } session = getCachedSessionProxy(targetSession, sessionList); } return session; }
简单来说就是CachingConnectionFactory会维护一个以Mode为key,LinkedList<Session>为value的map。每当请求一个Session的时候,会先从这个map取出对应的LinkedList,然后removeFirst()。若remove的结果不为null,则会直接返回。否则会createSession(con, mode)去新生成一个Session。然后使用CachedSessionInvocationHandler对该Session进行代理。
如何将Session重新放回缓存 上面我们发现Spring每取出一个Session就会从缓存将其删除,这是因为Jms Api并没有要求Session的实现是线程安全的。所以取出Session的时候需要将其删除,以防止其它线程在同一时间使用它。那么在当前线程使用完,什么时候将session重新返还缓存的map的呢?关键代码在CachedSessionInvocationHandler中。
JmsTemplate在Send完消息之后最终会调用Session的close方法。无论调用何种方法,最终都由代理类CachedSessionInvocationHandler来代理完成。在close的时候会将该session重新缓存,具体代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 CachingConnectionFactory.java(CachedSessionInvocationHandler) else if (methodName.equals("close" )) { if (active) { synchronized (this .sessionList) { if (this .sessionList.size() < getSessionCacheSize()) { try { logicalClose((Session) proxy); return null ; } catch (JMSException ex) { logger.trace("Logical close of cached JMS Session failed - discarding it" , ex); } } } } physicalClose(); return null ; } private void logicalClose (Session proxy) throws JMSException { if (this .transactionOpen && this .target.getTransacted()) { this .transactionOpen = false ; this .target.rollback(); } for (Iterator<Map.Entry<ConsumerCacheKey, MessageConsumer>> it = this .cachedConsumers.entrySet().iterator(); it.hasNext();) { Map.Entry<ConsumerCacheKey, MessageConsumer> entry = it.next(); if (entry.getKey().subscription != null ) { entry.getValue().close(); it.remove(); } } boolean returned = false ; synchronized (this .sessionList) { if (!this .sessionList.contains(proxy)) { this .sessionList.addLast(proxy); returned = true ; } } if (returned && logger.isTraceEnabled()) { logger.trace("Returned cached Session: " + this .target); } }
如何删除失败的Connection CachingConnectionFactory会缓存Connection(即SingleConnectionFactory的成员变量connection),那么在过程中如果因为各种原因connection出了问题,需要将这个connection重新重置为null。
那么Spring-Jms是怎样将connection重置为的null呢?这个过程有点难以发现,因为在打印出来的异常栈中相关的代码并不能发现任何能够将connection置为null的代码。
首先Spring-Jms中将这个connection置为null的代码只有一处,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 SingleConnectionFactory.java public void resetConnection () { synchronized (this .connectionMonitor) { if (this .target != null ) { closeConnection(this .target); } this .target = null ; this .connection = null ; } } @Override public void onException (JMSException ex) { logger.info("Encountered a JMSException - resetting the underlying JMS Connection" , ex); resetConnection(); } 调用这个方法的
那么是在哪里调用这个方法?
下面以Artemis-Client作为实际的Jms Api实现来讲述。
发送消息失败最终会调用ExceptionListener的OnException()方法,这中间的过程由Aretemis client来实现的,不在本文讨论范围之中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 ActiveMQConnection.java @Override public synchronized void connectionFailed (final ActiveMQException me, boolean failedOver) { if (me == null ) { return ; } ActiveMQConnection conn = connectionRef.get(); if (conn != null ) { try { final ExceptionListener exceptionListener = conn.getExceptionListener(); if (exceptionListener != null ) { final JMSException je = new JMSException(me.toString(), failedOver ? EXCEPTION_FAILOVER : EXCEPTION_DISCONNECT); je.initCause(me); new Thread(new Runnable() { @Override public void run () { exceptionListener.onException(je); } }).start(); } } catch (JMSException e) { if (!conn.closed) { ActiveMQJMSClientLogger.LOGGER.errorCallingExcListener(e); } } } }
它会利用conn.getExceptionListener()返回一个ExceptionListener,而这个ExceptionListener实际上就是SingleConnectionFactory(该factory实现了该接口)。之后调用ExceptionListener的onException方法来将connection置为null。
上面conn.getExceptionListener()之所以能够返回SingleConnectionFactory,是因为该connection实际上是一个代理,真实是调用SharedConnectionInvocationHandler(如下)。
1 2 3 4 5 6 7 8 9 10 11 SingleConnectionFactory.java else if (method.getName().equals("getExceptionListener" )) { synchronized (connectionMonitor) { if (this .localExceptionListener != null ) { return this .localExceptionListener; } else { return getExceptionListener(); } } }
相关代码 JmsTmplate发送消息的核心代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 JmsTemplate.java protected void doSend (Session session, Destination destination, MessageCreator messageCreator) throws JMSException { Assert.notNull(messageCreator, "MessageCreator must not be null" ); MessageProducer producer = createProducer(session, destination); try { Message message = messageCreator.createMessage(session); if (logger.isDebugEnabled()) { logger.debug("Sending created message: " + message); } doSend(producer, message); if (session.getTransacted() && isSessionLocallyTransacted(session)) { JmsUtils.commitIfNecessary(session); } } finally { JmsUtils.closeMessageProducer(producer); } } @Nullable public <T> T execute (SessionCallback<T> action, boolean startConnection) throws JmsException { Assert.notNull(action, "Callback object must not be null" ); Connection conToClose = null ; Session sessionToClose = null ; try { Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( obtainConnectionFactory(), this .transactionalResourceFactory, startConnection); if (sessionToUse == null ) { conToClose = createConnection(); sessionToClose = createSession(conToClose); if (startConnection) { conToClose.start(); } sessionToUse = sessionToClose; } if (logger.isDebugEnabled()) { logger.debug("Executing callback on JMS Session: " + sessionToUse); } return action.doInJms(sessionToUse); } catch (JMSException ex) { throw convertJmsAccessException(ex); } finally { JmsUtils.closeSession(sessionToClose); ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); } }