ActiveMQ 学习札记二
在ActiveMQ 学习札记一中写了简单ActiveMQ Demo。下面一步一步跟着创建过程分析源码。
一、ActiveMQConnectionFactory 创建ActiveMQ连接工厂
ActiveMQConnectionFactory 实现了ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory。因此可以创建基于队列的连接QueueConnectionFactory、基于主题的连接(发布/订阅)TopicConnectionFactory。同事实现了Cloneable,意味着工厂可以克隆。
private static final String DEFAULT_BROKER_HOST;
private static final int DEFAULT_BROKER_PORT;
static{
String host = null;
String port = null;
提供了默认的host 和port。因此在创建的时候如果没有指定服务地址和端口,也可以创建
try {
host = AccessController.doPrivileged(new PrivilegedAction<String>() {
@Override
public String run() {
String result = System.getProperty("org.apache.activemq.AMQ_HOST");
result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_HOST","localhost") : result;
return result;
}
});
port = AccessController.doPrivileged(new PrivilegedAction<String>() {
@Override
public String run() {
String result = System.getProperty("org.apache.activemq.AMQ_PORT");
result = (result==null||result.isEmpty()) ? System.getProperty("AMQ_PORT","61616") : result;
return result;
}
});
}catch(Throwable e){
LOG.debug("Failed to look up System properties for host and port",e);
}
host = (host == null || host.isEmpty()) ? "localhost" : host;
port = (port == null || port.isEmpty()) ? "61616" : port;
DEFAULT_BROKER_HOST = host;
DEFAULT_BROKER_PORT = Integer.parseInt(port);
}
ActiveMQConnectionFactory 提供了多种创建连接工厂的方法和设置连接的相关属性:
/**
* @return Returns the Connection.
*/
@Override
public Connection createConnection() throws JMSException {
return createActiveMQConnection();
}
/**
* @return Returns the Connection.
*/
@Override
public Connection createConnection(String userName, String password) throws JMSException {
return createActiveMQConnection(userName, password);
}
/**
* @return Returns the QueueConnection.
* @throws JMSException
*/
@Override
public QueueConnection createQueueConnection() throws JMSException {
return createActiveMQConnection().enforceQueueOnlyConnection();
}
/**
* @return Returns the QueueConnection.
*/
@Override
public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
return createActiveMQConnection(userName, password).enforceQueueOnlyConnection();
}
/**
* @return Returns the TopicConnection.
* @throws JMSException
*/
@Override
public TopicConnection createTopicConnection() throws JMSException {
return createActiveMQConnection();
}
/**
* @return Returns the TopicConnection.
*/
@Override
public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
return createActiveMQConnection(userName, password);
}
二、创建连接 activeMQConnectionFactory.createConnection():
/**
* @return Returns the Connection.
*/
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
}
ActiveMQConnection connection = null;
try {
Transport transport = createTransport(); ----表示传输的客户端,允许同步、异步发送和使用消息,允许有TCP/SSL/NIO
SSL
protected Transport createTransport() throws JMSException {
try {
URI connectBrokerUL = brokerURL;
String scheme = brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
}
if (scheme.equals("auto")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
} else if (scheme.equals("auto+nio")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
} else if (scheme.equals("auto+nio+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
}
return TransportFactory.connect(connectBrokerUL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
}
connection = createActiveMQConnection(transport, factoryStats);///构建一个ActiveMQ连接
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;
// Configure a single threaded executor who's core thread can timeout if
// idle
//每一个连接创建一个多线new ThreadFactory()按需创建新线程的对象。使用线程工厂可以删除对新线程的调用的硬连接,使应用程序能够使用特殊的线程子类、优先级等
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
//Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
//thread.setDaemon(true);
return thread;
}
});
// asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo(new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
this.transport.setTransportListener(this);
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
}
connection.setUserName(userName);
connection.setPassword(password);
configureConnection(connection);//配置连接的相关信息
protected void configureConnection(ActiveMQConnection connection) throws JMSException {
connection.setPrefetchPolicy(getPrefetchPolicy());
connection.setDisableTimeStampsByDefault(isDisableTimeStampsByDefault());
connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
connection.setCopyMessageOnSend(isCopyMessageOnSend());
connection.setUseCompression(isUseCompression());
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
connection.setDispatchAsync(isDispatchAsync());
connection.setUseAsyncSend(isUseAsyncSend());
connection.setAlwaysSyncSend(isAlwaysSyncSend());
connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
connection.setOptimizedAckScheduledAckInterval(getOptimizedAckScheduledAckInterval());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setExclusiveConsumer(isExclusiveConsumer());
connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
connection.setTransformer(getTransformer());
connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
connection.setProducerWindowSize(getProducerWindowSize());
connection.setWarnAboutUnstartedConnectionTimeout(getWarnAboutUnstartedConnectionTimeout());
connection.setSendTimeout(getSendTimeout());
connection.setCloseTimeout(getCloseTimeout());
connection.setSendAcksAsync(isSendAcksAsync());
connection.setAuditDepth(getAuditDepth());
connection.setAuditMaximumProducerNumber(getAuditMaximumProducerNumber());
connection.setUseDedicatedTaskRunner(isUseDedicatedTaskRunner());
connection.setConsumerFailoverRedeliveryWaitPeriod(getConsumerFailoverRedeliveryWaitPeriod());
connection.setCheckForDuplicates(isCheckForDuplicates());
connection.setMessagePrioritySupported(isMessagePrioritySupported());
connection.setTransactedIndividualAck(isTransactedIndividualAck());
connection.setNonBlockingRedelivery(isNonBlockingRedelivery());
connection.setMaxThreadPoolSize(getMaxThreadPoolSize());
connection.setSessionTaskRunner(getSessionTaskRunner());
connection.setRejectedTaskHandler(getRejectedTaskHandler());
connection.setNestedMapAndListEnabled(isNestedMapAndListEnabled());
connection.setRmIdFromConnectionId(isRmIdFromConnectionId());
connection.setConsumerExpiryCheckEnabled(isConsumerExpiryCheckEnabled());
connection.setTrustedPackages(getTrustedPackages());
connection.setTrustAllPackages(isTrustAllPackages());
connection.setConnectResponseTimeout(getConnectResponseTimeout());
if (transportListener != null) {
connection.addTransportListener(transportListener);
}
if (exceptionListener != null) {
connection.setExceptionListener(exceptionListener);
}
if (clientInternalExceptionListener != null) {
connection.setClientInternalExceptionListener(clientInternalExceptionListener);
}
}
transport.start();启动传输
if (clientID != null) {
connection.setDefaultClientID(clientID);
}
return connection;
} catch (JMSException e) {
// Clean up!
try {
connection.close();
} catch (Throwable ignore) {
}
throw e;
} catch (Exception e) {
// Clean up!
try {
connection.close();
} catch (Throwable ignore) {
}
throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
}
}
三,启动连接
四、创建session会话connection.createSession(false, Session.AUTO_ACKNOWLEDGE):
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();//检查连接是否关闭
ensureConnectionInfoSent();将ConnectionInfo发送到代理
return new ActiveMQXASession(this, getNextSessionId(), getAckMode(), isDispatchAsync());
}
//建立会话
/**
* Construct the Session
*
* @param connection
* @param sessionId
* @param acknowledgeMode n.b if transacted - the acknowledgeMode ==
* Session.SESSION_TRANSACTED
* @param asyncDispatch
* @param sessionAsyncDispatch
* @throws JMSException on internal error
*/
protected ActiveMQSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode, boolean asyncDispatch, boolean sessionAsyncDispatch) throws JMSException {
this.debug = LOG.isDebugEnabled();
this.connection = connection;
this.acknowledgementMode = acknowledgeMode;
this.asyncDispatch = asyncDispatch;
this.sessionAsyncDispatch = sessionAsyncDispatch;
this.info = new SessionInfo(connection.getConnectionInfo(), sessionId.getValue());
setTransactionContext(new TransactionContext(connection));
stats = new JMSSessionStatsImpl(producers, consumers);
this.connection.asyncSendPacket(info);
setTransformer(connection.getTransformer());
setBlobTransferPolicy(connection.getBlobTransferPolicy());
this.connectionExecutor=connection.getExecutor();
this.executor = new ActiveMQSessionExecutor(this);
connection.addSession(this);
if (connection.isStarted()) {
start();
}
}
五、创建所需的消息类型 队列?主题?
六、创建发布者/消费者
创建一个MessageProducer,将消息发送到指定的目的地。客户机使用MessageProducer对象将消息发送到目的地。由于队列和主题都继承自目标,因此可以在目标参数中使用它们来创建MessageProducer对象。
public MessageProducer createProducer(Destination destination) throws JMSException {
checkClosed();
if (destination instanceof CustomDestination) {
CustomDestination customDestination = (CustomDestination)destination;
return customDestination.createProducer(this);
}
int timeSendOut = connection.getSendTimeout();
return new ActiveMQMessageProducer(this, getNextProducerId(), ActiveMQMessageTransformation.transformDestination(destination),timeSendOut);
}
七、发送消息(消息产生)messageProducer.send(message)
//使用MessageProducer的默认交付模式、优先级和生存时间发送消息。
public void send(Message message) throws JMSException {
this.send(this.getDestination(),
message,
this.defaultDeliveryMode,
this.defaultPriority,
this.defaultTimeToLive);
}
将目的地、消息、生存时间等传递到消息并发送
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
this.send(destination, message, deliveryMode, priority, timeToLive, null);
}
发送消息
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
//1、检查是否有消息发送/2、检查消息是否有目的地3、在将给定消息发送到JMS总线之前,转换生成器内部的消息。
checkClosed();
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}
ActiveMQDestination dest;
if (destination.equals(info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
//在将给定消息发送到JMS总线之前,转换生成器内部的消息。
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (producerWindow != null) {
try {
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//通过当前会话发送消息
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
stats.onMessage();
}
发送消息
发送消息以供代理分派。
参数:producer - message producer.destination -消息目的地。
message -要发送的消息。
deliverymode - JMS消息交付模式。优先级-消息优先级。
timeToLive -消息过期。
producerWindow onComplete sendTimeout
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
doStartTransaction();//发送TransactionInfo来指示事务已经启动
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);
message.setJMSPriority(priority);
message.setJMSRedelivered(false);
// transform to our own message format here
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
msg.setDestination(destination);
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
if (msg != message) {
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
msg.setBrokerPath(null);
msg.setTransactionId(txid);
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);//设置消息连接
msg.onSend();//发送操作事件监听器。用于准备发送消息。
msg.setProducerId(msg.getMessageId().getProducerId());
//以上为皆为发送消息的准备,下面才是正式发送消息
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
this.connection.asyncSendPacket(msg);///通过连接发送数据包
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg, onComplete);
}
}
}
}
asyncSendPacket通过连接发送数据包
//发送消息数据包
public void asyncSendPacket(Command command) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
doAsyncSendPacket(command);
}
}
//执行 发送异步数据包
private void doAsyncSendPacket(Command command) throws JMSException {
try {
///发送一个异步消息,,,通过传输客户端Transport,前面已经说明了如何生成一个传输客户
端,用来传输数据包
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
以上便是 一个简单的消息队列源码游览。
上一篇: RabbitMQ笔记
下一篇: 矩阵Gauss消除法解线性方程
推荐阅读
-
HTML5 video标签(播放器)学习笔记(二):播放控制
-
PHP网页游戏学习之Xnova(ogame)源码解读(二)
-
C# Redis学习系列(二)Redis基本设置
-
从零学习node.js之搭建http服务器(二)
-
计算机二级python学习教程(3) python语言基本数据类型
-
PHP 面向对象程序设计(oop)学习笔记 (二) - 静态变量的属性和方法及延迟绑定
-
计算机二级python学习教程(1) 教大家如何学习python
-
计算机二级python学习教程(2) python语言基本语法元素
-
shell脚本学习指南[二](Arnold Robbins & Nelson H.F. Beebe著)
-
C#Web应用程序入门经典学习笔记之二