欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

CAT的Server消费消息(一)

程序员文章站 2022-07-15 16:50:14
...

1. 初始化RealtimeConsumer实时消费,使用容器自带的初始化前置处理Initializable,初始化PeriodManager,时间间隔为一个小时,也就是以小时为维度进行统计数据,并且设置统计类和消息分析器。

public void initialize() throws InitializationException {
		m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger);
		m_periodManager.init();

		Threads.forGroup("cat").start(m_periodManager);
	}

设置区间管理器相邻维度的相容时间为三分钟,也就是精度容许时间,在区间策略中保存区间长度和相应的容错长度。

public PeriodManager(long duration, MessageAnalyzerManager analyzerManager,
	      ServerStatisticManager serverStateManager, Logger logger) {
		m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME);
		m_active = true;
		m_analyzerManager = analyzerManager;
		m_serverStateManager = serverStateManager;
		m_logger = logger;
	}

执行PeriodManager#init,获取当前时间段的开始时间,第一次执行肯定是当前时间段的开始时间。第二次执行并且在同时段的57分钟内,则会设置m_lastEndTime为第一次的开始时间,并且返回0,代表本时段没有结束。第三次执行并且在同时段的第58分钟,这时候就取得是下一个小时的开始时间,设置m_lastStartTime也一样。以后当同时段的前57分钟执行的时候都返回的是0,也就意味着是同时段,不需要开始下一时段,其他的情况和上面类似循环执行。

public void init() {
		long startTime = m_strategy.next(System.currentTimeMillis());

		startPeriod(startTime);
	}
public long next(long now) {
		long startTime = now - now % m_duration;

		// for current period
		if (startTime > m_lastStartTime) {
			m_lastStartTime = startTime;
			return startTime;
		}

		// prepare next period ahead
		if (now - m_lastStartTime >= m_duration - m_aheadTime) {
			m_lastStartTime = startTime + m_duration;
			return startTime + m_duration;
		}

		// last period is over
		if (now - m_lastEndTime >= m_duration + m_extraTime) {
			long lastEndTime = m_lastEndTime;
			m_lastEndTime = startTime;
			return -lastEndTime;
		}

		return 0;
	}

开始一个时段周期,获取该时段的结束时间,初始化周期Period,设置开始结束时间,消息分析器,统计类以及日志属性。

private void startPeriod(long startTime) {
		long endTime = startTime + m_strategy.getDuration();
		Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);

		m_periods.add(period);
		period.start();
	}

创建处理各个消息类型的线程任务PeriodTask,包含各个类型的消息分析器,消息队列以及时段的开始时间。

public Period(long startTime, long endTime, MessageAnalyzerManager analyzerManager,
	      ServerStatisticManager serverStateManager, Logger logger) {
		m_startTime = startTime;
		m_endTime = endTime;
		m_analyzerManager = analyzerManager;
		m_serverStateManager = serverStateManager;
		m_logger = logger;

		List<String> names = m_analyzerManager.getAnalyzerNames();

		m_tasks = new HashMap<String, List<PeriodTask>>();
		for (String name : names) {
			List<MessageAnalyzer> messageAnalyzers = m_analyzerManager.getAnalyzer(name, startTime);

			for (MessageAnalyzer analyzer : messageAnalyzers) {
				MessageQueue queue = new DefaultMessageQueue(QUEUE_SIZE);
				PeriodTask task = new PeriodTask(analyzer, queue, startTime);

				task.enableLogging(m_logger);

				List<PeriodTask> analyzerTasks = m_tasks.get(name);

				if (analyzerTasks == null) {
					analyzerTasks = new ArrayList<PeriodTask>();
					m_tasks.put(name, analyzerTasks);
				}
				analyzerTasks.add(task);
			}
		}
	}

2. 初始化DefaultMessageAnalyzerManager,使用容器扩展的初始化函数initialize,从容器中查找所有实现了MessageAnalyzer接口的实现类,根据每个类的ID放置到map中,经过一些筛选最后保存相应的分析器名。

public void initialize() throws InitializationException {
		Map<String, MessageAnalyzer> map = lookupMap(MessageAnalyzer.class);

		for (MessageAnalyzer analyzer : map.values()) {
			analyzer.destroy();
		}

		m_analyzerNames = new ArrayList<String>(map.keySet());
		
		Collections.sort(m_analyzerNames, new Comparator<String>() {
			@Override
			public int compare(String str1, String str2) {
				String state = "state";
				String top = "top";

				if (state.equals(str1)) {
					return 1;
				} else if (state.equals(str2)) {
					return -1;
				}
				if (top.equals(str1)) {
					return -1;
				} else if (top.equals(str2)) {
					return 1;
				}
				return str1.compareTo(str2);
			}
		});
		
		m_analyzerNames.remove("matrix");
		m_analyzerNames.remove("dependency");
	}

在DefaultMessageAnalyzerManager#getAnalyzer中,首先删除当前时段往前第二个时段的分析器等数据,private Map<Long, Map<String, List<MessageAnalyzer>>> m_analyzers = new HashMap<Long, Map<String, List<MessageAnalyzer>>>();最外层是时段,然后是分析器名,最后是分析器集合,查找集合中所有符合名字条件的分析器,然后进行初始化,保存开始时间,时段区间值,以及容许时间,最后把分析器放入集合map中保存起来。

public void initialize(long startTime, long duration, long extraTime) {
		m_extraTime = extraTime;
		m_startTime = startTime;
		m_duration = duration;

		loadReports();
	}

3. 先用TransactionAnalyzer举例说明,初始化报告管理类DefaultReportManager,预加载当前时段的分析报告

protected void loadReports() {
		m_reportManager.loadHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
	}

这里的m_name在加载components-cat-consumer.xml文件中指定transaction,

public Map<String, T> loadHourlyReports(long startTime, StoragePolicy policy, int index) {
		Transaction t = Cat.newTransaction("Restore", m_name);
		Map<String, T> reports = m_reports.get(startTime);
		Cat.logEvent("Restore", m_name + ":" + index);
		ReportBucket bucket = null;

		if (reports == null) {
			reports = new ConcurrentHashMap<String, T>();
			m_reports.put(startTime, reports);
		}

		try {
			bucket = m_bucketManager.getReportBucket(startTime, m_name, index);

			for (String id : bucket.getIds()) {
				String xml = bucket.findById(id);
				T report = m_reportDelegate.parseXml(xml);

				reports.put(id, report);
			}

			m_reportDelegate.afterLoad(reports);
			t.setStatus(Message.SUCCESS);
		} catch (Throwable e) {
			t.setStatus(e);
			Cat.logError(e);
			m_logger.error(String.format("Error when loading %s reports of %s!", m_name, new Date(startTime)), e);
		} finally {
			t.complete();

			if (bucket != null) {
				m_bucketManager.closeBucket(bucket);
			}
		}
		return reports;
	}

初始化报告块管理器DefaultReportBucketManager,获取hdfs的存储路径"target/bucket"

public void initialize() throws InitializationException {
		m_reportBaseDir = m_configManager.getHdfsLocalBaseDir("report");
	}
public ReportBucket getReportBucket(long timestamp, String name, int index) throws IOException {
		Date date = new Date(timestamp);
		ReportBucket bucket = lookup(ReportBucket.class);

		bucket.initialize(name, date, index);
		return bucket;
	}

获取hdfs根目录,创建本时段的文件,存储路径"{0,date,yyyyMMdd}/{0,date,HH}/{1}/report-{2}"对应timestamp, index, name,在内存中保存索引文件里面的数据。索引文件里面保存的主要是数据文件里面的下标,根据下标的找出消息的长度,再通过长度找出后面相应的数据。

public void initialize(String name, Date timestamp, int index) throws IOException {
		m_baseDir = m_configManager.getHdfsLocalBaseDir("report");
		m_writeLock = new ReentrantLock();
		m_readLock = new ReentrantLock();

		String logicalPath = m_pathBuilder.getReportPath(name, timestamp, index);

		File dataFile = new File(m_baseDir, logicalPath);
		File indexFile = new File(m_baseDir, logicalPath + ".idx");

		if (indexFile.exists()) {
			loadIndexes(indexFile);
		}

		final File dir = dataFile.getParentFile();

		if (!dir.exists() && !dir.mkdirs()) {
			throw new IOException(String.format("Fail to create directory(%s)!", dir));
		}

		m_logicalPath = logicalPath;
		m_writeDataFile = new BufferedOutputStream(new FileOutputStream(dataFile, true), 8192);
		m_writeIndexFile = new BufferedOutputStream(new FileOutputStream(indexFile, true), 8192);
		m_writeDataFileLength = dataFile.length();
		m_readDataFile = new RandomAccessFile(dataFile, "r");
	}

从报告块中找出对应的报告数据,并且解析成TransactionReport,最后保存在内存缓存中。调用加载的后置处理。

public String findById(String id) throws IOException {
		Long offset = m_idToOffsets.get(id);

		if (offset != null) {
			m_readLock.lock();

			try {
				m_readDataFile.seek(offset);

				int num = Integer.parseInt(m_readDataFile.readLine());
				byte[] bytes = new byte[num];

				m_readDataFile.readFully(bytes);

				return new String(bytes, "utf-8");
			} catch (Exception e) {
				m_logger.error(String.format("Error when reading file(%s)!", m_readDataFile), e);
			} finally {
				m_readLock.unlock();
			}
		}

		return null;
	}

这个过程就是先从hdfs中加载当前时段的相应数据报告,放入内存中实时查询,最后销毁该文件m_bucketManager.closeBucket(bucket);释放LocalReportBucket类中保存的文件以及清空对应的内存数据。以上基本就是方法DefaultMessageAnalyzerManager#getAnalyzer的全部内容。

4. 每个时段里面包含不同消息类型分析器的不同子任务,也就是Map<String, List<PeriodTask>> m_tasks,最后把该时段加进时段管理器的集合m_periods中,执行该时段的start方法。给每个子任务设置下标并且放进守护线程池组"Cat-RealtimeConsumer"

public void start() {
		SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

		m_logger.info(String.format("Starting %s tasks in period [%s, %s]", m_tasks.size(),
		      df.format(new Date(m_startTime)), df.format(new Date(m_endTime - 1))));

		for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
			List<PeriodTask> taskList = tasks.getValue();

			for (int i = 0; i < taskList.size(); i++) {
				PeriodTask task = taskList.get(i);

				task.setIndex(i);

				Threads.forGroup("Cat-RealtimeConsumer").start(task);
			}
		}
	}

执行PeriodManager守护线程。Threads.forGroup("cat").start(m_periodManager);

 

相关标签: CAT