CAT的Server消费消息(二)
1. 消费消息的流程从DefaultMessageHandler#handle=》RealtimeConsumer#consume,查找符合当前时间的时段Period,每个时段都会有开始结束时间,timestamp >= m_startTime && timestamp < m_endTime;
public void consume(MessageTree tree) {
long timestamp = tree.getMessage().getTimestamp();
Period period = m_periodManager.findPeriod(timestamp);
if (period != null) {
period.distribute(tree);
} else {
m_serverStateManager.addNetworkTimeError(1);
}
}
分配消息distribute,把消息放进各个类型消息的周期任务的消息队列中
public void distribute(MessageTree tree) {
m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
boolean success = true;
String domain = tree.getDomain();
for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
List<PeriodTask> tasks = entry.getValue();
int length = tasks.size();
int index = 0;
boolean manyTasks = length > 1;
if (manyTasks) {
index = Math.abs(domain.hashCode()) % length;
}
PeriodTask task = tasks.get(index);
boolean enqueue = task.enqueue(tree);
if (enqueue == false) {
if (manyTasks) {
task = tasks.get((index + 1) % length);
enqueue = task.enqueue(tree);
if (enqueue == false) {
success = false;
}
} else {
success = false;
}
}
}
if (!success) {
m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);
}
}
队列长度默认是30000,消息溢出时统计个数,抽样打印日志,根据消息类型选择释放堆外直接内存ByteBuf,避免内存泄漏
public boolean enqueue(MessageTree tree) {
boolean result = m_queue.offer(tree);
if (!result) { // trace queue overflow
m_queueOverflow++;
if (m_queueOverflow % (10 * CatConstants.ERROR_COUNT) == 0) {
m_logger.warn(m_analyzer.getClass().getSimpleName() + " queue overflow number " + m_queueOverflow);
}
// fix issue #1155
if (m_analyzer.getClass().getSimpleName().equals("DumpAnalyzer") && tree instanceof DefaultMessageTree) {
DefaultMessageTree t = (DefaultMessageTree) tree;
if (t.getBuffer() != null) {
t.getBuffer().release();
}
}
}
return result;
}
2. 时段管理线程PeriodManager主要用于创建新的时段以及销毁过期的时段,周期1s。当value大于0则表示本时段已经结束或者在本时段的最后三分钟,开始新时段,当value为0时代表本时段,不用新建和销毁,小于0则开始线程结束周期任务
public void run() {
while (m_active) {
try {
long now = System.currentTimeMillis();
long value = m_strategy.next(now);
if (value > 0) {
startPeriod(value);
} else if (value < 0) {
// last period is over,make it asynchronous
Threads.forGroup("cat").start(new EndTaskThread(-value));
}
} catch (Throwable e) {
Cat.logError(e);
}
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
break;
}
}
}
任务结束线程EndTaskThread,value会返回上个时段的开始时间,当该事件处于某个周期Period时,就关闭该周期,结束包含的所有子任务PeriodTask,
public void run() {
endPeriod(m_startTime);
}
private void endPeriod(long startTime) {
int len = m_periods.size();
for (int i = 0; i < len; i++) {
Period period = m_periods.get(i);
if (period.isIn(startTime)) {
period.finish();
m_periods.remove(i);
break;
}
}
}
public void finish() {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date startDate = new Date(m_startTime);
Date endDate = new Date(m_endTime - 1);
m_logger.info(String.format("Finishing %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
df.format(endDate)));
try {
for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
for (PeriodTask task : tasks.getValue()) {
task.finish();
}
}
} catch (Throwable e) {
Cat.logError(e);
} finally {
m_logger.info(String.format("Finished %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
df.format(endDate)));
}
}
执行每个消息类型分析器优雅关闭,保存报告数据到hdfs或者数据库中,最后销毁消息类型分析器。
public void finish() {
try {
m_analyzer.doCheckpoint(true);
m_analyzer.destroy();
} catch (Exception e) {
Cat.logError(e);
}
}
public synchronized void doCheckpoint(boolean atEnd) {
if (atEnd) {
m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index);
} else {
m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
}
}
3. 任务处理线程PeriodTask主要用于处理消息队列里面的消息,使用各个类型的分析器进行处理
public void run() {
try {
m_analyzer.analyze(m_queue);
} catch (Exception e) {
Cat.logError(e);
}
}
判断当前时间的活跃状态或者是否超过本该执行的时段,冗余时间为三分钟,从子任务的消息队列中取出消息进行消费process
public void analyze(MessageQueue queue) {
while (!isTimeout() && isActive()) {
MessageTree tree = queue.poll();
if (tree != null) {
try {
process(tree);
} catch (Throwable e) {
m_errors++;
if (m_errors == 1 || m_errors % 10000 == 0) {
Cat.logError(e);
}
}
}
}
while (true) {
MessageTree tree = queue.poll();
if (tree != null) {
try {
process(tree);
} catch (Throwable e) {
m_errors++;
if (m_errors == 1 || m_errors % 10000 == 0) {
Cat.logError(e);
}
}
} else {
break;
}
}
}
当状态不满足时,这个时候会先把消息队列中的消息消费完,然后再退出线程。
4. 获取当前类型的报告信息TransactionReport,DefaultReportManager#getHourlyReport=》TransactionDelegate#makeReport
public void process(MessageTree tree) {
String domain = tree.getDomain();
TransactionReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
Message message = tree.getMessage();
report.addIp(tree.getIpAddress());
if (message instanceof Transaction) {
Transaction root = (Transaction) message;
processTransaction(report, tree, root);
}
}
只处理Transaction类型的消息,先对消息进行过滤,检查是否是截断的消息,截断的消息时当超时或者超长时从客户端发出的,最后就是处理该消息,统计数据。
protected void processTransaction(TransactionReport report, MessageTree tree, Transaction t) {
String type = t.getType();
String name = t.getName();
if (m_serverFilterConfigManager.discardTransaction(type, name)) {
return;
} else {
Pair<Boolean, Long> pair = checkForTruncatedMessage(tree, t);
if (pair.getKey().booleanValue()) {
String ip = tree.getIpAddress();
TransactionType transactionType = report.findOrCreateMachine(ip).findOrCreateType(type);
TransactionName transactionName = transactionType.findOrCreateName(name);
String messageId = tree.getMessageId();
processTypeAndName(t, transactionType, transactionName, messageId, pair.getValue().doubleValue() / 1000d);
}
List<Message> children = t.getChildren();
for (Message child : children) {
if (child instanceof Transaction) {
processTransaction(report, tree, (Transaction) child);
}
}
}
}
当该消息是截断的消息时即消息满足type.equals("RemoteCall") && name.equals("Next"),key为false,这个时候不会统计父消息的相关数据,而子消息的getDurationInMicros为0,因为它本身没有子消息。当完整的消息发送过来时满足type.equals("TruncatedTransaction") && name.equals("TotalDuration"),这个时候会统计消息的正确区间。
private Pair<Boolean, Long> checkForTruncatedMessage(MessageTree tree, Transaction t) {
Pair<Boolean, Long> pair = new Pair<Boolean, Long>(true, t.getDurationInMicros());
List<Message> children = t.getChildren();
int size = children.size();
if (tree.getMessage() == t && size > 0) { // root transaction with children
Message last = children.get(size - 1);
if (last instanceof Event) {
String type = last.getType();
String name = last.getName();
if (type.equals("RemoteCall") && name.equals("Next")) {
pair.setKey(false);
} else if (type.equals("TruncatedTransaction") && name.equals("TotalDuration")) {
try {
long delta = Long.parseLong(last.getData().toString());
pair.setValue(delta);
} catch (Exception e) {
Cat.logError(e);
}
}
}
}
return pair;
}
推荐阅读
-
SQL Server 2008中的代码安全(二) DDL触发器与登录触发器
-
数据库(SQL Server )经典例题(二):对S表、P表、J表、SPJ表的操作——单表查询
-
SQL Server将自己的查询结果作为待查询数据子列之二
-
sql server利用不同语种语言显示报错错误消息的方法示例
-
CAT的Client端发送消息
-
CAT的Server消费消息(一)
-
CAT的Server消费消息(二)
-
CAT的Server初始化
-
我的Spring Cloud(二):Eureka Server注册中心
-
基于Spring Boot 2.0.2.RELEASE 的 Spring Cloud 速成指南 | 二. Spring Cloud 服务注册中心(Eureka Server)