欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

HornetQ HA功能分析

程序员文章站 2022-07-16 22:31:01
...

测试方式:

使用我们之前使用的脚本,在hornetq做failover的环境下,施加很大的压力(50个线程),看failover能否成功(看有没有丢数据,主-副机能不能正常的切换过来)

具体的操作方式是:

Hornetq自带的example有HA这块的测试脚本

/hornetq-2.1.2.Final/examples/jms/non-transaction-failover

/hornetq-2.1.2.Final/examples/jms/transaction-failover

执行[bes@test157 transaction-failover]$./build.sh

带broker起来后启动压力测试脚本

 

需要修改的配置

hornetq-jms.xml

增加对应的destination(为我们压力脚本设置的目的地)

<topic name="topic1">

  <entry name="/my/Topic1"/>

</topic>
 

hornetq-configuration.xml

在<configuration>节点下增加

<security-enabled>false</security-enabled>
 

hornetq-beans.xml和client-jndi.properties中对应的localhost替换为本机的ip

增加两个对应的topic的测试类,在原有的用例中没有topic的测试用例

 

日志数据的比较:

Hornetq日志数据的比较调用堆栈:

ReplicationCompareDataMessage

-decode:PacketDecoder

--bufferReceived:RemotingConnectionImpl

----bufferReceived:DelegatingBufferHandler

--------bufferReceived:DelegatingBufferHandler

 

日志比较的关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl

 

public void compareJournalInformation(
			final JournalLoadInformation[] journalInformation) throws

	HornetQException

	{

		if (journalLoadInformation == null ||

		journalLoadInformation.length != journalInformation.length)

		{

			throw new HornetQException(
					HornetQException.INTERNAL_ERROR,

					"Live Node contains more journals than the backup node. Probably a version match error");

		}

		for (int i = 0; i < journalInformation.length; i++)

		{

			if (!journalInformation[i].equals


(journalLoadInformation[i])) {

				ReplicationEndpointImpl.log
						.warn("Journal comparission mismatch:\n" +

						journalParametersToString(journalInformation));

			}

		}

	}
 

 

这里面重写了org.hornetq.core.journal.JournalLoadInformation的equals方法,在看看它的equals方法

        JournalLoadInformation other = (JournalLoadInformation) obj;

	if(maxID != other.maxID){
		return false;
	}

	if(numberOfRecords != other.numberOfRecords){
		return false;
	}

	return true;
 

从上面我们可以看出,它比较的是maxID以及numberOfRecords这两个值。我们在看看其中的一个赋值的地方:

 

public void decodeRest(final HornetQBuffer buffer) {

		int numberOfJournals = buffer.readInt();

		journalInformation = new JournalLoadInformation[numberOfJournals];

		for (int i = 0; i < numberOfJournals; i++) {

			journalInformation[i] = new JournalLoadInformation();

			journalInformation[i].setNumberOfRecords(buffer.readInt());

			journalInformation[i].setMaxID(buffer.readLong());

		}

	}
 

就目前的调查来看,在启动HornetQ和创建session会话的时候会调用到日志比较。下图是比较日志调用的路线

compareJournals(ReplicationManagerImpl)

compareJournals(HornetQServerImpl)

initialisePart2(HornetQServerImpl)

checkActivate(HornetQServerImpl)

handleCreateSession(HornetQPacketHandler)

handleReattachSession(HornetQPacketHandler)

start(HornetQServerImpl)

 

日志复制通道

RepliactionManagerImpl#start方法用于获取一个与备原机器的连接,创建用于日志复制的会话

start(ReplicationManagerImpl)

activated(JMSServerMangerImpl)

createJournal(JMSServerMangerImpl)

initJournal(JMSServerMangerImpl)

activated(JMSServerMangerImpl)

callActivatedCallbacks(HornetQServerImpl)

initialisePart2(HornetQServerImpl)

checkActivate(HornetQServerImpl)

start(HornetQServerImpl)

关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl#start

 

public synchronized void start() throws Exception {

		// 获取和备原机器的连接
		replicatingConnection = failoverManager.getConnection();
		long channelID = replicatingConnection.generateChannelID();
		Channel mainChannel = replicatingConnection.getChannel(1, -1);
		replicatingChannel = replicatingConnection.getChannel(channelID, -1);
		replicatingChannel.setHandler(responseHandler);
		CreateReplicationSessionMessage replicationStartPackage = new CreateReplicationSessionMessage(
				channelID);
		// 发送一个创建拷贝会话的命令(PacketImpl.CREATE_REPLICATION)
		mainChannel.sendBlocking(replicationStartPackage);
	}
 

上面发出的消息HorentQPacketHanler#handlePacket(final Packet packet)会处理

 

case CREATE_REPLICATION: {
	//Create queue can also be fielded here in the case of a replicated store and forward queue creation
	CreateRelicationSessionMessage request = (CreateRelicationSessionMessage)packet;
	handleCreateReplication(request);
	break;
}
 

日志的同步

调用ReplicationManagerImpl#sendReplicatePacket来复制日志;这个方法的调用者很多,如消息的发送,结束发送,创建连接工厂,物理目的地等操作

sendReplicatePacket(ReplicationManagerImpl)

appendAddRecord(ReplicatedJournal)

appendAddRecord(ReplicatedJournal)

storeMessage(JournalStroageManager)

processRoute(PostOfficeImpl)

redistribute(PostOfficeImpl)

route(PostOfficeImpl)

routeQueueInfo(PostOfficeImpl)

关键代码org.hornetq.core.replication.impl.ReplicationManagerImpl#sendReplicationPacket

 

private void sendReplicatePacket(final Pakcet packet)
	{
		boolean runItNow = false;
		OperationContext repliToken = OperationContextImpl
				.getContext(executorFactory);
		repliToken.replicationLineUp();
		synchronized (replicationLock)
		{
			if (!enabled)
			{
				runItNow = true;
			}
			else
			{
				pendingTokens.add(repliToken);
				replicatingChannel.send(packet);



			}
		}
		if (runItNow)
		{
			repliToken.relicationDone();
		}
	}
 

收到一条消息就发到replicatingChannel,做到了日志同步。

replicatingChannel的建立

org.hornetq.core.replication.impl.ReplicationManagerImpl#start

public synchronized void start() throws Exception {
	    replicatingConnection = failoverManager.getConnection();
	    long channelID = replicatingConnection.generateChannelID();
	    //在这里为replicatingChannel赋值
	    replicatingChannel = replicatingConnection.getChannel(channelID, -1);
	    replicatingChanne.setHandler(responseHandler);
	}
 

主备机器切换

HornetQ的失效备原是在客户端层面来做的 ,之前会注册一个用于失效备原的监听器,当监听到异常时,就会尝试进行失效备原;下面是在发生失效备原时的调用堆栈:

FailoverManagerImpl#failoverOrReconnect

FailoverManagerImpl#handleConnectionFailure

FailoverManagerImpl$DelegatingFailureListener#connectionFailed

RemotingConnectionImpl#callFailureListeners

RemotingConnectionImpl#fail

FailoverManagerImpl$ChannelOHandler

 

Failover关键代码org.hornetq.core.client.impl.FailoverManagerImpl#failoverOrReconnect

 

相关标签: HornetQ