欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ActiveMQ 学习札记二

程序员文章站 2022-07-15 08:59:37
...

   在ActiveMQ 学习札记一中写了简单ActiveMQ Demo。下面一步一步跟着创建过程分析源码。

一、ActiveMQConnectionFactory 创建ActiveMQ连接工厂

ActiveMQConnectionFactory 实现了ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory。因此可以创建基于队列的连接QueueConnectionFactory、基于主题的连接(发布/订阅)TopicConnectionFactory。同事实现了Cloneable,意味着工厂可以克隆。

    private static final String DEFAULT_BROKER_HOST;
    private static final int DEFAULT_BROKER_PORT;
    static{
        String host = null;
        String port = null;

提供了默认的host  和port。因此在创建的时候如果没有指定服务地址和端口,也可以创建
        try {
             host = AccessController.doPrivileged(new PrivilegedAction<String>() {
                 @Override
                 public String run() {
                     String result = System.getProperty("org.apache.activemq.AMQ_HOST");
                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_HOST","localhost") : result;
                     return result;
                 }
             });
             port = AccessController.doPrivileged(new PrivilegedAction<String>() {
                 @Override
                 public String run() {
                     String result = System.getProperty("org.apache.activemq.AMQ_PORT");
                     result = (result==null||result.isEmpty()) ?  System.getProperty("AMQ_PORT","61616") : result;
                     return result;
                 }
             });
        }catch(Throwable e){
            LOG.debug("Failed to look up System properties for host and port",e);
        }
        host = (host == null || host.isEmpty()) ? "localhost" : host;
        port = (port == null || port.isEmpty()) ? "61616" : port;
        DEFAULT_BROKER_HOST = host;
        DEFAULT_BROKER_PORT = Integer.parseInt(port);
    }

ActiveMQConnectionFactory 提供了多种创建连接工厂的方法和设置连接的相关属性:

ActiveMQ 学习札记二

ActiveMQ 学习札记二

    /**
     * @return Returns the Connection.
     */
    @Override
    public Connection createConnection() throws JMSException {
        return createActiveMQConnection();
    }

    /**
     * @return Returns the Connection.
     */
    @Override
    public Connection createConnection(String userName, String password) throws JMSException {
        return createActiveMQConnection(userName, password);
    }

    /**
     * @return Returns the QueueConnection.
     * @throws JMSException
     */
    @Override
    public QueueConnection createQueueConnection() throws JMSException {
        return createActiveMQConnection().enforceQueueOnlyConnection();
    }

    /**
     * @return Returns the QueueConnection.
     */
    @Override
    public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
        return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
    }

    /**
     * @return Returns the TopicConnection.
     * @throws JMSException
     */
    @Override
    public TopicConnection createTopicConnection() throws JMSException {
        return createActiveMQConnection();
    }

    /**
     * @return Returns the TopicConnection.
     */
    @Override
    public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
        return createActiveMQConnection(userName, password);
    }

二、创建连接 activeMQConnectionFactory.createConnection():

    /**
     * @return Returns the Connection.
     */
    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
        if (brokerURL == null) {
            throw new ConfigurationException("brokerURL not set.");
        }
        ActiveMQConnection connection = null;
        try {
            Transport transport = createTransport(); ----表示传输的客户端,允许同步、异步发送和使用消息,允许有TCP/SSL/NIO

SSL

    protected Transport createTransport() throws JMSException {
        try {
            URI connectBrokerUL = brokerURL;
            String scheme = brokerURL.getScheme();
            if (scheme == null) {
                throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
            }
            if (scheme.equals("auto")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
            } else if (scheme.equals("auto+ssl")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
            } else if (scheme.equals("auto+nio")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
            } else if (scheme.equals("auto+nio+ssl")) {
                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
            }

            return TransportFactory.connect(connectBrokerUL);
        } catch (Exception e) {
            throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
        }
    }


            connection = createActiveMQConnection(transport, factoryStats);///构建一个ActiveMQ连接

    protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {

        this.transport = transport;
        this.clientIdGenerator = clientIdGenerator;
        this.factoryStats = factoryStats;

        // Configure a single threaded executor who's core thread can timeout if
        // idle
//每一个连接创建一个多线new ThreadFactory()按需创建新线程的对象。使用线程工厂可以删除对新线程的调用的硬连接,使应用程序能够使用特殊的线程子类、优先级等
        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
                //thread.setDaemon(true);
                return thread;
            }
        });
        // asyncConnectionThread.allowCoreThreadTimeOut(true);
        String uniqueId = connectionIdGenerator.generateId();
        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
        this.info.setManageable(true);
        this.info.setFaultTolerant(transport.isFaultTolerant());
        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);

        this.transport.setTransportListener(this);

        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        this.timeCreated = System.currentTimeMillis();
        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }

            connection.setUserName(userName);
            connection.setPassword(password);

            configureConnection(connection);//配置连接的相关信息

    protected void configureConnection(ActiveMQConnection connection) throws JMSException {
        connection.setPrefetchPolicy(getPrefetchPolicy());
        connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
        connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
        connection.setCopyMessageOnSend(isCopyMessageOnSend());
        connection.setUseCompression(isUseCompression());
        connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
        connection.setDispatchAsync(isDispatchAsync());
        connection.setUseAsyncSend(isUseAsyncSend());
        connection.setAlwaysSyncSend(isAlwaysSyncSend());
        connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
        connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
        connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
        connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
        connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
        connection.setExclusiveConsumer(isExclusiveConsumer());
        connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
        connection.setTransformer(getTransformer());
        connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
        connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
        connection.setProducerWindowSize(getProducerWindowSize());
        connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
        connection.setSendTimeout(getSendTimeout());
        connection.setCloseTimeout(getCloseTimeout());
        connection.setSendAcksAsync(isSendAcksAsync());
        connection.setAuditDepth(getAuditDepth());
        connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
        connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
        connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
        connection.setCheckForDuplicates(isCheckForDuplicates());
        connection.setMessagePrioritySupported(isMessagePrioritySupported());
        connection.setTransactedIndividualAck(isTransactedIndividualAck());
        connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
        connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
        connection.setSessionTaskRunner(getSessionTaskRunner());
        connection.setRejectedTaskHandler(getRejectedTaskHandler());
        connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
        connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
        connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
        connection.setTrustedPackages(getTrustedPackages());
        connection.setTrustAllPackages(isTrustAllPackages());
        connection.setConnectResponseTimeout(getConnectResponseTimeout());
        if (transportListener != null) {
            connection.addTransportListener(transportListener);
        }
        if (exceptionListener != null) {
            connection.setExceptionListener(exceptionListener);
        }
        if (clientInternalExceptionListener != null) {
            connection.setClientInternalExceptionListener(clientInternalExceptionListener);
        }
    }

            transport.start();启动传输

            if (clientID != null) {
                connection.setDefaultClientID(clientID);
            }

            return connection;
        } catch (JMSException e) {
            // Clean up!
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
            throw e;
        } catch (Exception e) {
            // Clean up!
            try {
                connection.close();
            } catch (Throwable ignore) {
            }
            throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
        }
    }

三,启动连接

四、创建session会话connection.createSession(false, Session.AUTO_ACKNOWLEDGE):

    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
        checkClosedOrFailed();//检查连接是否关闭
        ensureConnectionInfoSent();将ConnectionInfo发送到代理
        return new ActiveMQXASession(this, getNextSessionId(), getAckMode(), isDispatchAsync());
    }

//建立会话
    /**
     * Construct the Session
     *
     * @param connection
     * @param sessionId
     * @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
     *                Session.SESSION_TRANSACTED
     * @param asyncDispatch
     * @param sessionAsyncDispatch
     * @throws JMSException on internal error
     */
    protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
        this.debug = LOG.isDebugEnabled();
        this.connection = connection;
        this.acknowledgementMode = acknowledgeMode;
        this.asyncDispatch = asyncDispatch;
        this.sessionAsyncDispatch = sessionAsyncDispatch;
        this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
        setTransactionContext(new TransactionContext(connection));
        stats = new JMSSessionStatsImpl(producers, consumers);
        this.connection.asyncSendPacket(info);
        setTransformer(connection.getTransformer());
        setBlobTransferPolicy(connection.getBlobTransferPolicy());
        this.connectionExecutor=connection.getExecutor();
        this.executor = new ActiveMQSessionExecutor(this);
        connection.addSession(this);
        if (connection.isStarted()) {
            start();
        }

    }

五、创建所需的消息类型  队列?主题?

六、创建发布者/消费者

     创建一个MessageProducer,将消息发送到指定的目的地。客户机使用MessageProducer对象将消息发送到目的地。由于队列和主题都继承自目标,因此可以在目标参数中使用它们来创建MessageProducer对象。

    public MessageProducer createProducer(Destination destination) throws JMSException {
        checkClosed();
        if (destination instanceof CustomDestination) {
            CustomDestination customDestination = (CustomDestination)destination;
            return customDestination.createProducer(this);
        }
        int timeSendOut = connection.getSendTimeout();
        return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
    }

七、发送消息(消息产生)messageProducer.send(message)

//使用MessageProducer的默认交付模式、优先级和生存时间发送消息。
    public void send(Message message) throws JMSException {
        this.send(this.getDestination(),
                  message,
                  this.defaultDeliveryMode,
                  this.defaultPriority,
                  this.defaultTimeToLive);
    }

将目的地、消息、生存时间等传递到消息并发送

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
        this.send(destination, message, deliveryMode, priority, timeToLive, null);
    }

发送消息


    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
       //1、检查是否有消息发送/2、检查消息是否有目的地3、在将给定消息发送到JMS总线之前,转换生成器内部的消息。

 checkClosed();
        if (destination == null) {
            if (info.getDestination() == null) {
                throw new UnsupportedOperationException("A destination must be specified.");
            }
            throw new InvalidDestinationException("Don't understand null destinations");
        }

        ActiveMQDestination dest;
        if (destination.equals(info.getDestination())) {
            dest = (ActiveMQDestination)destination;
        } else if (info.getDestination() == null) {
            dest = ActiveMQDestination.transform(destination);
        } else {
            throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
        }
        if (dest == null) {
            throw new JMSException("No destination specified");
        }



        if (transformer != null) {
            //在将给定消息发送到JMS总线之前,转换生成器内部的消息。
            Message transformedMessage = transformer.producerTransform(session, this, message);
            if (transformedMessage != null) {
                message = transformedMessage;
            }
        }

        if (producerWindow != null) {
            try {
                producerWindow.waitForSpace();
            } catch (InterruptedException e) {
                throw new JMSException("Send aborted due to thread interrupt.");
            }
        }
        //通过当前会话发送消息
        this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);

        stats.onMessage();
    }

发送消息

   发送消息以供代理分派。
        参数:producer - message producer.destination -消息目的地。
             message -要发送的消息。
             deliverymode - JMS消息交付模式。优先级-消息优先级。
             timeToLive -消息过期。
             producerWindow onComplete sendTimeout 
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {

        checkClosed();
        if (destination.isTemporary() && connection.isDeleted(destination)) {
            throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
        }
        synchronized (sendMutex) {
            // tell the Broker we are about to start a new transaction
            doStartTransaction();//发送TransactionInfo来指示事务已经启动
            TransactionId txid = transactionContext.getTransactionId();
            long sequenceNumber = producer.getMessageSequence();

            //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
            message.setJMSDeliveryMode(deliveryMode);
            long expiration = 0L;
            if (!producer.getDisableMessageTimestamp()) {
                long timeStamp = System.currentTimeMillis();
                message.setJMSTimestamp(timeStamp);
                if (timeToLive > 0) {
                    expiration = timeToLive + timeStamp;
                }
            }
            message.setJMSExpiration(expiration);
            message.setJMSPriority(priority);
            message.setJMSRedelivered(false);

            // transform to our own message format here
            ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
            msg.setDestination(destination);
            msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));

            // Set the message id.
            if (msg != message) {
                message.setJMSMessageID(msg.getMessageId().toString());
                // Make sure the JMS destination is set on the foreign messages too.
                message.setJMSDestination(destination);
            }
            //clear the brokerPath in case we are re-sending this message
            msg.setBrokerPath(null);

            msg.setTransactionId(txid);
            if (connection.isCopyMessageOnSend()) {
                msg = (ActiveMQMessage)msg.copy();
            }
            msg.setConnection(connection);//设置消息连接
            msg.onSend();//发送操作事件监听器。用于准备发送消息。
            msg.setProducerId(msg.getMessageId().getProducerId());

            //以上为皆为发送消息的准备,下面才是正式发送消息

            if (LOG.isTraceEnabled()) {
                LOG.trace(getSessionId() + " sending message: " + msg);
            }
            if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                this.connection.asyncSendPacket(msg);///通过连接发送数据包
                if (producerWindow != null) {
                    // Since we defer lots of the marshaling till we hit the
                    // wire, this might not
                    // provide and accurate size. We may change over to doing
                    // more aggressive marshaling,
                    // to get more accurate sizes.. this is more important once
                    // users start using producer window
                    // flow control.
                    int size = msg.getSize();
                    producerWindow.increaseUsage(size);
                }
            } else {
                if (sendTimeout > 0 && onComplete==null) {
                    this.connection.syncSendPacket(msg,sendTimeout);
                }else {
                    this.connection.syncSendPacket(msg, onComplete);
                }
            }

        }
    }

asyncSendPacket通过连接发送数据包

 //发送消息数据包   
 public void asyncSendPacket(Command command) throws JMSException {
        if (isClosed()) {
            throw new ConnectionClosedException();
        } else {
            doAsyncSendPacket(command);
        }
    }
    //执行  发送异步数据包
    private void doAsyncSendPacket(Command command) throws JMSException {
        try {
            ///发送一个异步消息,,,通过传输客户端Transport,前面已经说明了如何生成一个传输客户                        
            端,用来传输数据包
            this.transport.oneway(command); 
        } catch (IOException e) {
            throw JMSExceptionSupport.create(e);
        }
    }

以上便是 一个简单的消息队列源码游览。