欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

CAT的Server消费消息(二)

程序员文章站 2022-07-15 16:50:32
...

1. 消费消息的流程从DefaultMessageHandler#handle=》RealtimeConsumer#consume,查找符合当前时间的时段Period,每个时段都会有开始结束时间,timestamp >= m_startTime && timestamp < m_endTime;

public void consume(MessageTree tree) {
		long timestamp = tree.getMessage().getTimestamp();
		Period period = m_periodManager.findPeriod(timestamp);

		if (period != null) {
			period.distribute(tree);
		} else {
			m_serverStateManager.addNetworkTimeError(1);
		}
	}

分配消息distribute,把消息放进各个类型消息的周期任务的消息队列中

public void distribute(MessageTree tree) {
		m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
		boolean success = true;
		String domain = tree.getDomain();

		for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
			List<PeriodTask> tasks = entry.getValue();
			int length = tasks.size();
			int index = 0;
			boolean manyTasks = length > 1;

			if (manyTasks) {
				index = Math.abs(domain.hashCode()) % length;
			}
			PeriodTask task = tasks.get(index);
			boolean enqueue = task.enqueue(tree);

			if (enqueue == false) {
				if (manyTasks) {
					task = tasks.get((index + 1) % length);
					enqueue = task.enqueue(tree);

					if (enqueue == false) {
						success = false;
					}
				} else {
					success = false;
				}
			}
		}

		if (!success) {
			m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);
		}
	}

队列长度默认是30000,消息溢出时统计个数,抽样打印日志,根据消息类型选择释放堆外直接内存ByteBuf,避免内存泄漏

public boolean enqueue(MessageTree tree) {
		boolean result = m_queue.offer(tree);

		if (!result) { // trace queue overflow
			m_queueOverflow++;

			if (m_queueOverflow % (10 * CatConstants.ERROR_COUNT) == 0) {
				m_logger.warn(m_analyzer.getClass().getSimpleName() + " queue overflow number " + m_queueOverflow);
			}

			// fix issue #1155
			if (m_analyzer.getClass().getSimpleName().equals("DumpAnalyzer") && tree instanceof DefaultMessageTree) {
				DefaultMessageTree t = (DefaultMessageTree) tree;

				if (t.getBuffer() != null) {
					t.getBuffer().release();
				}
			}
		}

		return result;
	}

2. 时段管理线程PeriodManager主要用于创建新的时段以及销毁过期的时段,周期1s。当value大于0则表示本时段已经结束或者在本时段的最后三分钟,开始新时段,当value为0时代表本时段,不用新建和销毁,小于0则开始线程结束周期任务

public void run() {
		while (m_active) {
			try {
				long now = System.currentTimeMillis();
				long value = m_strategy.next(now);

				if (value > 0) {
					startPeriod(value);
				} else if (value < 0) {
					// last period is over,make it asynchronous
					Threads.forGroup("cat").start(new EndTaskThread(-value));
				}
			} catch (Throwable e) {
				Cat.logError(e);
			}

			try {
				Thread.sleep(1000L);
			} catch (InterruptedException e) {
				break;
			}
		}
	}

任务结束线程EndTaskThread,value会返回上个时段的开始时间,当该事件处于某个周期Period时,就关闭该周期,结束包含的所有子任务PeriodTask,

public void run() {
			endPeriod(m_startTime);
		}
private void endPeriod(long startTime) {
		int len = m_periods.size();

		for (int i = 0; i < len; i++) {
			Period period = m_periods.get(i);

			if (period.isIn(startTime)) {
				period.finish();
				m_periods.remove(i);
				break;
			}
		}
	}
public void finish() {
		SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		Date startDate = new Date(m_startTime);
		Date endDate = new Date(m_endTime - 1);

		m_logger.info(String.format("Finishing %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
		      df.format(endDate)));

		try {
			for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
				for (PeriodTask task : tasks.getValue()) {
					task.finish();
				}
			}
		} catch (Throwable e) {
			Cat.logError(e);
		} finally {
			m_logger.info(String.format("Finished %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
			      df.format(endDate)));
		}
	}

执行每个消息类型分析器优雅关闭,保存报告数据到hdfs或者数据库中,最后销毁消息类型分析器。

public void finish() {
		try {
			m_analyzer.doCheckpoint(true);
			m_analyzer.destroy();
		} catch (Exception e) {
			Cat.logError(e);
		}
	}

public synchronized void doCheckpoint(boolean atEnd) {
		if (atEnd) {
			m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index);
		} else {
			m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
		}
	}

3. 任务处理线程PeriodTask主要用于处理消息队列里面的消息,使用各个类型的分析器进行处理

public void run() {
		try {
			m_analyzer.analyze(m_queue);
		} catch (Exception e) {
			Cat.logError(e);
		}
	}

判断当前时间的活跃状态或者是否超过本该执行的时段,冗余时间为三分钟,从子任务的消息队列中取出消息进行消费process

public void analyze(MessageQueue queue) {
		while (!isTimeout() && isActive()) {
			MessageTree tree = queue.poll();

			if (tree != null) {
				try {
					process(tree);
				} catch (Throwable e) {
					m_errors++;

					if (m_errors == 1 || m_errors % 10000 == 0) {
						Cat.logError(e);
					}
				}
			}
		}

		while (true) {
			MessageTree tree = queue.poll();

			if (tree != null) {
				try {
					process(tree);
				} catch (Throwable e) {
					m_errors++;

					if (m_errors == 1 || m_errors % 10000 == 0) {
						Cat.logError(e);
					}
				}
			} else {
				break;
			}
		}
	}

当状态不满足时,这个时候会先把消息队列中的消息消费完,然后再退出线程。

4. 获取当前类型的报告信息TransactionReport,DefaultReportManager#getHourlyReport=》TransactionDelegate#makeReport

public void process(MessageTree tree) {
		String domain = tree.getDomain();
		TransactionReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
		Message message = tree.getMessage();

		report.addIp(tree.getIpAddress());

		if (message instanceof Transaction) {
			Transaction root = (Transaction) message;

			processTransaction(report, tree, root);
		}
	}

只处理Transaction类型的消息,先对消息进行过滤,检查是否是截断的消息,截断的消息时当超时或者超长时从客户端发出的,最后就是处理该消息,统计数据。

protected void processTransaction(TransactionReport report, MessageTree tree, Transaction t) {
		String type = t.getType();
		String name = t.getName();

		if (m_serverFilterConfigManager.discardTransaction(type, name)) {
			return;
		} else {
			Pair<Boolean, Long> pair = checkForTruncatedMessage(tree, t);

			if (pair.getKey().booleanValue()) {
				String ip = tree.getIpAddress();
				TransactionType transactionType = report.findOrCreateMachine(ip).findOrCreateType(type);
				TransactionName transactionName = transactionType.findOrCreateName(name);
				String messageId = tree.getMessageId();

				processTypeAndName(t, transactionType, transactionName, messageId, pair.getValue().doubleValue() / 1000d);
			}

			List<Message> children = t.getChildren();

			for (Message child : children) {
				if (child instanceof Transaction) {
					processTransaction(report, tree, (Transaction) child);
				}
			}
		}
	}

当该消息是截断的消息时即消息满足type.equals("RemoteCall") && name.equals("Next"),key为false,这个时候不会统计父消息的相关数据,而子消息的getDurationInMicros为0,因为它本身没有子消息。当完整的消息发送过来时满足type.equals("TruncatedTransaction") && name.equals("TotalDuration"),这个时候会统计消息的正确区间。

private Pair<Boolean, Long> checkForTruncatedMessage(MessageTree tree, Transaction t) {
		Pair<Boolean, Long> pair = new Pair<Boolean, Long>(true, t.getDurationInMicros());
		List<Message> children = t.getChildren();
		int size = children.size();

		if (tree.getMessage() == t && size > 0) { // root transaction with children
			Message last = children.get(size - 1);

			if (last instanceof Event) {
				String type = last.getType();
				String name = last.getName();

				if (type.equals("RemoteCall") && name.equals("Next")) {
					pair.setKey(false);
				} else if (type.equals("TruncatedTransaction") && name.equals("TotalDuration")) {
					try {
						long delta = Long.parseLong(last.getData().toString());

						pair.setValue(delta);
					} catch (Exception e) {
						Cat.logError(e);
					}
				}
			}
		}

		return pair;
	}

 

相关标签: CAT