欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

(6) - Quartz项目实战

程序员文章站 2022-07-14 10:19:53
...

本片博文将阐述项目工作中使用Quartz的情况,包含项目背景、项目框架、Quartz集群部署等方面,重点讲述如何在实际项目中使用Quartz。

1. 背景

因项目需求,需要定时调用数据下载接口,并将数据存储至诸如mongo、redis、elasticsearch等数据库或缓存中。具体涉及到的需求如下:

a. 调用接口的任务均从mongo数据库读取;

b. 任务的个数随着业务量的增加而增加;

c. 每个调用任务的定时执行时间可能不同,且定时执行时间在mongo中可配置;

d. 任务的执行需要动态更新,如检测到某一任务的定时时间发生变化,则任务的执行也需要实时修改 

e. mongo、redis、elasticsearch等数据库中所存储的字段也由mongo进行配置;

f. 任务执行需要实时性较高、可靠性较强、可扩展性较高等

综上需求,调研了一番,发现任务调度框架Quartz可满足项目需求。

2. 框架

基于项目的需求,结合任务调度框架Quartz,大体的流程框架如下图所示:

(6) - Quartz项目实战

1) 首先构建从mongo加载任务

2) 将任务的配置信息初始化至Quartz

3) 通过Quartz的Job任务实现定时调用下载接口任务

4) 将下载的数据依据配置,存储至数据库中

5) 定时检测任务通过定时扫描mongo数据库,查看相关任务信息的配置是否发生变化,如果发生变化,则进行动态更新

6) 为了实现高可用性、可扩展性,可以直接使用Quartz原生的集群特性。

3. 核心代码

核心代码将会涵盖上述流程图中的相关环节,为了项目的保密性,相关信息也会隐藏。

3.1 任务主流程  

public class SchedulerRunner {
    static Logger logger = LoggerFactory.getLogger(SchedulerRunner.class);

    public static void main(String[] args) {

        // 加载日志配置文件
        PropertyConfigurator.configure("./conf/log4j.properties");

        // 加载quartz配置文件
        System.setProperty("org.quartz.properties", "./conf/quartz.properties");

        // 执行任务解析与调度
        run();
    }

    public static void run(){
        // 获取配置信息表
        List<TaskInfo> taskInfos = GenerateTaskInfo.generateTaskInfoFromMysql();
        if(taskInfos.size() == 0){
            logger.info("there is no tasks from mongoInfo");
            return;
        }

        // 过滤下线任务
        taskInfos = GenerateTaskInfo.filterTask(taskInfos);
        if(taskInfos.size() == 0){
            logger.info("all tasks if offline, no need to run");
            return;
        }

        Scheduler scheduler = null;
        try {
            scheduler = StdSchedulerFactory.getDefaultScheduler();
        } catch (SchedulerException e) {
            e.printStackTrace();
        }

        if(scheduler == null){
            logger.error("create scheduler failed");
            return;
        }

        if(isSchedulerClear()){
            clearSchedulerJob(scheduler);
        }

        // 加入任务调度
        for(TaskInfo task : taskInfos){
            SchedulerFactory.addJob2Scheduler(task, scheduler);
        }

        // 加入动态更新任务
        SchedulerFactory.addDynamicUpdateJob2Scheduler(scheduler);

        // 开启任务
        try {
            scheduler.start();
        } catch (SchedulerException e) {
            logger.error("start scheduler error!");
        }
    }

    public static void clearSchedulerJob(Scheduler scheduler){
        try {
            scheduler.clear();
        } catch (SchedulerException e) {
            logger.error("clear  scheduler error!");
        }
    }

    /**
     * 基于配置文件中的信息,加载调度器开始运行时的清洗标识
     * @return
     */
    private static boolean isSchedulerClear(){
        Configuration conf = OcpConfHelper.getInstance().getOcpConf();
        return conf.getBooleanValue("cleanSchedulerFlag", "true");
    }
}

3.2 封装任务对象

public class TaskInfo {

    protected String categoryId; // 业务Id
    protected String categoryName; // 业务名称
    protected String sourceId; // 信源Id
    protected String sourceName; // 信源名称
    protected int sourceStatus; // 信源状态
    protected String pipelineConf; // 信源pipeline配置信息
    protected List<String> dbStoreTypes; // 业务的存储类型
    protected String esConfInfo; // ES存储配置
    protected String dbConfInfo; // DB存储配置
    protected String cronInfo; // 定时任务信息
    protected int sourceType; // 实时更新还是离线更新
    protected List<String> indexBuildEles; // 更新索引的信息
    protected List<String> idBuildEles; // id的构建因素
    protected String indexType; // 全量或增量
    protected String categoryLevel1; // 一级分类
    protected String zhName; // 中文信息
    protected Map<String,String> outputType; //输出参数名及其类型
    protected String providerName;
    protected String functionName; //category_function名称

    public String getProviderName() {
        return providerName;
    }

    public void setProviderName(String providerName) {
        this.providerName = providerName;
    }

    public String getCategoryId() {
        return categoryId;
    }

    public void setCategoryId(String categoryId) {
        this.categoryId = categoryId;
    }

    public String getCategoryName() {
        return categoryName;
    }

    public void setCategoryName(String categoryName) {
        this.categoryName = categoryName;
    }

    public String getSourceId() {
        return sourceId;
    }

    public void setSourceId(String sourceId) {
        this.sourceId = sourceId;
    }

    public String getSourceName() {
        return sourceName;
    }

    public void setSourceName(String sourceName) {
        this.sourceName = sourceName;
    }

    public int getSourceStatus() {
        return sourceStatus;
    }

    public void setSourceStatus(int sourceStatus) {
        this.sourceStatus = sourceStatus;
    }

    public String getPipelineConf() {
        return pipelineConf;
    }

    public void setPipelineConf(String pipelineConf) {
        this.pipelineConf = pipelineConf;
    }

    public String getEsConfInfo() {
        return esConfInfo;
    }

    public void setEsConfInfo(String esConfInfo) {
        this.esConfInfo = esConfInfo;
    }

    public String getDbConfInfo() {
        return dbConfInfo;
    }

    public void setDbConfInfo(String dbConfInfo) {
        this.dbConfInfo = dbConfInfo;
    }

    public String getCronInfo() {
        return cronInfo;
    }

    public void setCronInfo(String cronInfo) {
        this.cronInfo = cronInfo;
    }

    public int getSourceType() {
        return sourceType;
    }

    public void setSourceType(int sourceType) {
        this.sourceType = sourceType;
    }

    public List<String> getIdBuildEles() {
        return idBuildEles;
    }

    public void setIdBuildEles(List<String> idBuildEles) {
        this.idBuildEles = idBuildEles;
    }

    public List<String> getIndexBuildEles() {
        return indexBuildEles;
    }

    public void setIndexBuildEles(List<String> indexBuildEles) {
        this.indexBuildEles = indexBuildEles;
    }

    public String getIndexType() {
        return indexType;
    }

    public void setIndexType(String indexType) {
        this.indexType = indexType;
    }

    public String getCategoryLevel1() {
        return categoryLevel1;
    }

    public void setCategoryLevel1(String categoryLevel1) {
        this.categoryLevel1 = categoryLevel1;
    }

    public String getZhName() {
        return zhName;
    }

    public void setZhName(String zhName) {
        this.zhName = zhName;
    }

    public TaskInfo(){}

    public List<String> getDbStoreTypes() {
        return dbStoreTypes;
    }

    public void setDbStoreTypes(List<String> dbStoreTypes) {
        this.dbStoreTypes = dbStoreTypes;
    }

    public Map<String, String> getOutputType() {
        return outputType;
    }

    public void setOutputType(Map<String, String> outputType) {
        this.outputType = outputType;
    }

    public String getFunctionName() {
        return functionName;
    }

    public void setFunctionName(String functionName) {
        this.functionName = functionName;
    }

    /**
     * 是否有相同的定时信息
     * @param taskInfo
     * @return
     */
    public boolean hasSameCronInfo(TaskInfo taskInfo){
        if(taskInfo == null) return false;
        return this.getCronInfo().equalsIgnoreCase(taskInfo.getCronInfo());
    }

    @Override
    public String toString() {
        return "TaskInfo{" +
                "categoryId='" + categoryId + '\'' +
                ", categoryName='" + categoryName + '\'' +
                ", sourceId='" + sourceId + '\'' +
                ", sourceName='" + sourceName + '\'' +
                ", sourceStatus=" + sourceStatus +
                ", pipelineConf='" + pipelineConf + '\'' +
                ", dbStoreTypes=" + dbStoreTypes +
                ", esConfInfo='" + esConfInfo + '\'' +
                ", dbConfInfo='" + dbConfInfo + '\'' +
                ", cronInfo='" + cronInfo + '\'' +
                ", sourceType=" + sourceType +
                ", indexBuildEles=" + indexBuildEles +
                ", idBuildEles=" + idBuildEles +
                ", indexType='" + indexType + '\'' +
                ", categoryLevel1='" + categoryLevel1 + '\'' +
                ", zhName='" + zhName + '\'' +
                ", outputType='" + outputType + '\'' +
                ", providerName='" + providerName + '\'' +
                ", functionName='" + functionName + '\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        TaskInfo taskInfo = (TaskInfo) o;

        if (sourceStatus != taskInfo.sourceStatus) return false;
        if (sourceType != taskInfo.sourceType) return false;
        if (categoryName != null ? !categoryName.equals(taskInfo.categoryName) : taskInfo.categoryName != null)
            return false;
        if (sourceName != null ? !sourceName.equals(taskInfo.sourceName) : taskInfo.sourceName != null) return false;
        if (providerName != null ? !providerName.equals(taskInfo.providerName) : taskInfo.providerName != null) return false;
        if (pipelineConf != null ? !pipelineConf.equals(taskInfo.pipelineConf) : taskInfo.pipelineConf != null)
            return false;
        if (dbStoreTypes != null ? !dbStoreTypes.equals(taskInfo.dbStoreTypes) : taskInfo.dbStoreTypes != null)
            return false;
        if (esConfInfo != null ? !esConfInfo.equals(taskInfo.esConfInfo) : taskInfo.esConfInfo != null) return false;
        if (dbConfInfo != null ? !dbConfInfo.equals(taskInfo.dbConfInfo) : taskInfo.dbConfInfo != null) return false;
        if (cronInfo != null ? !cronInfo.equals(taskInfo.cronInfo) : taskInfo.cronInfo != null) return false;
        if (indexBuildEles != null ? !indexBuildEles.equals(taskInfo.indexBuildEles) : taskInfo.indexBuildEles != null)
            return false;
        if (idBuildEles != null ? !idBuildEles.equals(taskInfo.idBuildEles) : taskInfo.idBuildEles != null)
            return false;
        if (indexType != null ? !indexType.equals(taskInfo.indexType) : taskInfo.indexType != null) return false;
        if (categoryLevel1 != null ? !categoryLevel1.equals(taskInfo.categoryLevel1) : taskInfo.categoryLevel1 != null)
            return false;
        if (outputType != null ? !outputType.equals(taskInfo.outputType) : taskInfo.outputType != null)
            return false;
        if (functionName != null ? !functionName.equals(taskInfo.functionName) : taskInfo.functionName != null)
            return false;
        return zhName != null ? zhName.equals(taskInfo.zhName) : taskInfo.zhName == null;

    }

    @Override
    public int hashCode() {
        int result = categoryName != null ? categoryName.hashCode() : 0;
        result = 31 * result + (sourceName != null ? sourceName.hashCode() : 0);
        result = 31 * result + (providerName != null ? providerName.hashCode() : 0);
        result = 31 * result + sourceStatus;
        result = 31 * result + (pipelineConf != null ? pipelineConf.hashCode() : 0);
        result = 31 * result + (dbStoreTypes != null ? dbStoreTypes.hashCode() : 0);
        result = 31 * result + (esConfInfo != null ? esConfInfo.hashCode() : 0);
        result = 31 * result + (dbConfInfo != null ? dbConfInfo.hashCode() : 0);
        result = 31 * result + (cronInfo != null ? cronInfo.hashCode() : 0);
        result = 31 * result + sourceType;
        result = 31 * result + (indexBuildEles != null ? indexBuildEles.hashCode() : 0);
        result = 31 * result + (idBuildEles != null ? idBuildEles.hashCode() : 0);
        result = 31 * result + (indexType != null ? indexType.hashCode() : 0);
        result = 31 * result + (categoryLevel1 != null ? categoryLevel1.hashCode() : 0);
        result = 31 * result + (zhName != null ? zhName.hashCode() : 0);
        result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
        result = 31 * result + (functionName != null ? functionName.hashCode() : 0);
        return result;
    }
}

3.3 任务的构造及初始化

/**
 * 获取调度任务的相关信息
 * Created by songwang4 on 2017/6/7.
 */
public class GenerateTaskInfo {

    static Logger logger = LoggerFactory.getLogger(GenerateTaskInfo.class);

    static DBCollection sourceColl = MongoUtil.createOcpSourceDB();
    static DBCollection categoryColl = MongoUtil.createOcpCategoryDB();

    /**
     * 从数据库中读取任务相关信息
     *
     * @return
     */
    public static List<TaskInfo> generateTaskInfoFromMongo() {
        // 将任务信息进行封装
        List<TaskInfo> tasks = Lists.newArrayList();
        TaskInfo task = null;

        DBCursor sourceCur = sourceColl.find();
        DBObject sourceObj = null;
        DBObject categoryObj = null;
        while (sourceCur.hasNext()) {
            sourceObj = sourceCur.next();
            task = new TaskInfo();

            String sourceName = sourceObj.get("sourceName").toString();
            String categoryName = sourceObj.get("category").toString();
            // 基于业务名查找对应的业务表信息
            categoryObj = categoryColl.findOne(new BasicDBObject("catName", categoryName));
            if (categoryObj == null) {
                logger.error("no category found through source: " + sourceName);
                continue;
            }

            task.setCategoryId(categoryObj.get("_id").toString()); // 业务Id
            task.setCategoryName(categoryName); // 业务名

            List<String> dbStoreTypes = Lists.newArrayList();
            if (categoryObj.containsField("storeType")) {
                try {
                    JSONArray storeTypeArr = JSON.parseArray(categoryObj.get("storeType").toString());
                    for (int i = 0; i < storeTypeArr.size(); i++) {
                        dbStoreTypes.add(storeTypeArr.getString(i));
                    }
                } catch (Exception e) {
                }
            }
            task.setDbStoreTypes(dbStoreTypes); // 存储类型

            task.setCategoryLevel1(categoryObj.get("parent").toString()); // 一级业务分类
            task.setZhName(sourceObj.get("zhName").toString());

            task.setDbConfInfo(categoryObj.containsField("db") ? categoryObj.get("db").toString() : categoryName); // DB配置
            task.setEsConfInfo(categoryObj.containsField("es") ? categoryObj.get("es").toString() : categoryName); // ES配置

            task.setIndexBuildEles(extractBuilderEles(categoryObj, "isIndex", "itemName")); // 构建ES索引信息
            task.setIdBuildEles(extractBuilderEles(categoryObj, "isGK", "itemName")); // 构建id的信息元素

            task.setSourceId(sourceObj.get("_id").toString()); // 信源Id
            task.setSourceName(sourceName); // 信源名称

            int status = StatusType.OFFLINE;
            if (sourceObj.containsField("status")) {
                String statusType = sourceObj.get("status").toString();
                if (statusType.equals(StatusType.STR_ONLINE)) {
                    status = StatusType.ONLINE;
                }
            }
            task.setSourceStatus(status); // 信源的上下线状态

            int sourceType = SourceType.REAL_TIME_PROCESS;
            if (sourceObj.containsField("type")) {
                String strStatusType = sourceObj.get("type").toString();
                if (strStatusType.equals(SourceType.STR_OFF_LINE_PROCESS)) {
                    sourceType = SourceType.OFF_LINE_PROCESS;
                }
            }
            task.setSourceType(sourceType); // 离线或实时处理

            task.setIndexType(sourceObj.containsField("indexType") ?
                    sourceObj.get("indexType").toString() : ""); // 增量或全量标识

            // 定时时间配置
            task.setCronInfo(sourceObj.containsField("timerInfo") ?
                    sourceObj.get("timerInfo").toString() : "");
            if (task.getCronInfo().trim().length() == 0) {
                task.setCronInfo(generateCronInfo(sourceObj));
            }

            task.setPipelineConf(sourceObj.containsField("mappingWorkflow") ?
                    sourceObj.get("mappingWorkflow").toString() : ""); // pipeline配置信息

            tasks.add(task);
        }
        sourceCur.close();
        return tasks;
    }

    /**
     * 构建生成id或es的信息元素
     *
     * @param categoryObj
     * @param queryField
     * @param retureField
     * @return
     */
    public static List<String> extractBuilderEles(DBObject categoryObj, String queryField, String retureField) {
        List<String> builerEles = Lists.newArrayList();
        JSONArray dataItemArr = null;
        try {
            dataItemArr = JSON.parseArray(categoryObj.get("dataItems").toString());
        } catch (JSONException e) {
        }
        if (dataItemArr != null && dataItemArr.size() > 0) {
            JSONObject dataItemJson = null;
            for (int i = 0; i < dataItemArr.size(); i++) {
                dataItemJson = dataItemArr.getJSONObject(i);
                if (dataItemJson.containsKey(queryField) && dataItemJson.getBoolean(queryField)) {
                    builerEles.add(dataItemJson.getString(retureField).trim());
                }
            }
        }
        return builerEles;
    }

   
    /**
     * 基于业务表中的信息构造定时任务表达式
     *
     * @param sourceObj
     * @return
     */
    public static String generateCronInfo(DBObject sourceObj) {
        String updateTimeType = "";
        String updateTimeCycle = "";
        if (sourceObj.containsField("updateType")) {
            updateTimeType = sourceObj.get("updateType").toString();
        }
        if (sourceObj.containsField("updateCycle")) {
            updateTimeCycle = sourceObj.get("updateCycle").toString();
        }
        if (updateTimeType.trim().length() == 0 || updateTimeCycle.trim().length() == 0) {
            return "";
        }

        StringBuilder sb = new StringBuilder();
        Date date = null;
        if (updateTimeType.equalsIgnoreCase("YEAR")) {
            date = TimeUtil.parseDate(updateTimeCycle, "MM-dd HH:mm");
            if (date == null) {
                try {
                    sb.append(TimeUtil.extractFixedTimeByDay(Integer.parseInt(updateTimeCycle), 0, 0));
                } catch (NumberFormatException e) {
                }
            } else {
                sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
                        .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")
                        .append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" ")
                        .append(TimeUtil.extractFixedTime(date, Calendar.MONTH) + 1).append(" ? *");
            }
        }

        if (updateTimeType.equalsIgnoreCase("MONTH")) {
            date = TimeUtil.parseDate(updateTimeCycle, "dd HH:mm");
            if (date == null) return "";
            sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
                    .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ")
                    .append(TimeUtil.extractFixedTime(date, Calendar.DATE)).append(" * ?");
        }

        if (updateTimeType.equalsIgnoreCase("DAY")) {
            date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");
            if (date == null) return "";
            sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
                    .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" * * ?");
        }

        if (updateTimeType.equalsIgnoreCase("WEEK")) {
            String weekDay = "1";
            if (sourceObj.containsField("weekDay")) {
                weekDay = sourceObj.get("weekDay").toString();
            }
            date = TimeUtil.parseDate(updateTimeCycle, "HH:mm");
            if (date == null) return "";
            sb.append("0 ").append(TimeUtil.extractFixedTime(date, Calendar.MINUTE)).append(" ")
                    .append(TimeUtil.extractFixedTime(date, Calendar.HOUR_OF_DAY)).append(" ? * ")
                    .append(TimeUtil.extractFixedTime(weekDay));
        }

        if (updateTimeType.equalsIgnoreCase("HOUR")) {
            try {
                int hour = Integer.parseInt(updateTimeCycle);
                sb.append(TimeUtil.extractFixedTimeByHour(hour, 0));
            } catch (NumberFormatException e) {
            }
        }

        if (updateTimeType.equalsIgnoreCase("MINUTE")) {
            try {
                int minute = Integer.parseInt(updateTimeCycle);
                sb.append(TimeUtil.extractFixedTimeByMinute(minute));
            } catch (NumberFormatException e) {
            }
        }

        if (updateTimeType.equalsIgnoreCase("SECOND")) {
            sb.append("*/").append(updateTimeCycle).append(" * * * * ?");
        }

        return sb.toString();
    }

    /**
     * 过滤下线的任务
     *
     * @param tasks
     * @return
     */
    public static List<TaskInfo> filterTask(List<TaskInfo> tasks) {
        List<TaskInfo> taskInfos = Lists.newArrayList();
        for (TaskInfo taskInfo : tasks) {
            // 过滤下线的信源状态或实时的信源
            if (taskInfo.getSourceStatus() == StatusType.OFFLINE
                    || taskInfo.getSourceType() != SourceType.OFF_LINE_PROCESS) {
                continue;
            }
            taskInfos.add(taskInfo);
        }
        return taskInfos;
    }

    /**
     * 基于业务名称对任务进行分组
     *
     * @param oriTasks
     * @return
     */
    public static Map<String, List<TaskInfo>> groupTaskByCategory(List<TaskInfo> oriTasks) {
        Map<String, List<TaskInfo>> categoryTasks = Maps.newHashMap();
        for (TaskInfo oriTask : oriTasks) {
            if (!categoryTasks.containsKey(oriTask.getCategoryId())) {
                List<TaskInfo> taskInfos = Lists.newArrayList();
                taskInfos.add(oriTask);
                categoryTasks.put(oriTask.getCategoryId(), taskInfos);
            } else {
                boolean hasSameSourceId = false;
                for (TaskInfo taskInfo : categoryTasks.get(oriTask.getCategoryId())) {
                    if (taskInfo.getSourceId().equals(oriTask.getSourceId())) {
                        hasSameSourceId = true;
                        break;
                    }
                }
                if (!hasSameSourceId) {
                    categoryTasks.get(oriTask.getCategoryId()).add(oriTask);
                }
            }
        }
        return categoryTasks;
    }
}

3.4 调用下载接口的任务

/**
 * 离线存储任务
 * 注意:上一个任务如未完成,且下一次的定时任务已到执行时间,则需要等待上一个任务
 * 执行完成,再进行下一个任务
 */
@DisallowConcurrentExecution
public class ScheduleJob implements Job {
    static Logger logger = LoggerFactory.getLogger(ScheduleJob.class);

    public ScheduleJob() {
    }

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        JobDetail jobDetail = jobExecutionContext.getJobDetail();

        JSONObject json = new JSONObject();
        json.put("jobName", jobDetail.getKey().getName());
        json.put("jobGroup", jobDetail.getKey().getGroup());
        json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName());
        json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup());

        logger.info("job is running: " + json.toString());

        JobDataMap dataMap = jobDetail.getJobDataMap();
        JSONObject confJson = null;
        try {
            confJson = JSONObject.parseObject(dataMap.getString(SchedulerFactory.CONF_INFO));
        } catch (JSONException e) {
        }
        if (confJson == null) {
            logger.error("conf is empty: " + json.toString());
            return;
        }

        // 获取存储类型
        TaskInfo taskInfo = new Gson().fromJson(confJson.toString(), TaskInfo.class);
        if (!isNeedtoRun(taskInfo)) {
            logger.info("no need to run: " + json.toString());
            return;
        }

        List<IDataOut> dataOuts = Lists.newArrayList();
        for (String dbStoreType : taskInfo.getDbStoreTypes()) {
            switch (dbStoreType) {
                case StoreType.STR_MONGO_STORE:
                    dataOuts.add(new DataOut2Mongo(taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
                    break;
                case StoreType.STR_ES_STORE:
                    dataOuts.add(new DataOut2ES(taskInfo.getCategoryName(),taskInfo.getFunctionName(), taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getIndexBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
                    break;
                case StoreType.STR_REDIS_STORE:
                    dataOuts.add(new DataOut2Redis(taskInfo.getSourceName(), taskInfo.getIdBuildEles(), taskInfo.getOutputType(),taskInfo.getProviderName()));
                    break;
            }
        }

        // 创建数据拉取对象,拉取前存储一次,拉取后存储一次
        CrawlerLog crawlerLog = createCrawlerLog(taskInfo);
        if (dataOuts.size() > 0) {
            PipeExecuter.executeSave(taskInfo.getPipelineConf(), dataOuts, crawlerLog);
        }
    }

    /**
     * 判断job是否需要执行
     *
     * @param taskInfo
     * @return
     */
    public static boolean isNeedtoRun(TaskInfo taskInfo) {

        // 实时or离线
        if (taskInfo.getSourceType() == SourceType.REAL_TIME_PROCESS) {
            logger.warn("the job is real-time process, no need to run");
            return false;
        }

        // job的上下线状态
        if (taskInfo.getSourceStatus() == StatusType.OFFLINE) {
            logger.warn("the job status is offline, no need to run");
            return false;
        }

        // pipeline的配置信息
        if (Strings.isNullOrEmpty(taskInfo.getPipelineConf()) || taskInfo.getPipelineConf().trim().length() == 0) {
            logger.warn("no pipeline configure info, no need to run");
            return false;
        }

        // job的存储信息
        if (taskInfo.getDbStoreTypes().size() == 0) {
            logger.warn("the job store type is 0, no need to store");
            return false;
        }
        return true;
    }

    /**
     * 创建拉取数据的日志,以便管理系统查看
     *
     * @param taskInfo
     * @return
     */
    public CrawlerLog createCrawlerLog(TaskInfo taskInfo) {
        CrawlerLog crawlerLog = new CrawlerLog();
        crawlerLog.setIndexType(taskInfo.getIndexType()); // 增量还是全量
        crawlerLog.setCategoryLv1(taskInfo.getCategoryLevel1());
        String sourceName = taskInfo.getSourceName();
        String sourceZhName = taskInfo.getZhName();
        String sourceArr[] = sourceName.split("_");
        String sourceZhArr[] = sourceZhName.split("_");

        crawlerLog.setCategoryLv2((sourceArr != null && sourceArr.length > 0) ? sourceArr[0] : "");
        crawlerLog.setFunctionName((sourceArr != null && sourceArr.length > 1) ? sourceArr[1] : "");
        crawlerLog.setProviderName((sourceArr != null && sourceArr.length > 2) ? sourceArr[2] : "");

        crawlerLog.setFunctionZhName((sourceZhArr != null && sourceZhArr.length > 1) ? sourceZhArr[1] : "");
        crawlerLog.setProviderZhName((sourceZhArr != null && sourceZhArr.length > 2) ? sourceZhArr[2] : "");
        crawlerLog.setId();
        return crawlerLog;
    }
}

3.5 任务调度工厂

工厂用于生成任务的触发器Trigger,以及创建任务Job。

public class SchedulerFactory {
    static Logger logger = LoggerFactory.getLogger(SchedulerFactory.class);

    public static final String CONF_INFO = "conf_info";
    public static final String DYNAMIC_UPDATE_JOB_NAME = "dynamicUpdateJob";
    public static final String DYNAMIC_UPDATE_GROUP_NAME = "dynamicUpdateGroup";
    public static final String DYNAMIC_UPDATE_CRONTINFO = "*/30 * * * * ?";

    /**
     * 将任务加入任务调度中
     * @param taskInfo
     * @param scheduler
     */
    public static void addJob2Scheduler(TaskInfo taskInfo, Scheduler scheduler) {
        try {
            JobDetail jobDetail = generateJobDetail(taskInfo);
            if(jobDetail == null){
                logger.error("create job failed!");
                return;
            }

            Trigger triger = generateTrigger(taskInfo);
            if(triger == null){
                logger.error("create trigger failed!");
                return;
            }

            // 加载执行Job及定时器
            scheduler.scheduleJob(jobDetail,triger);
        } catch (SchedulerException e) {
            logger.error("create scheduler error, error message: "+e.toString());
        }
    }

    public static void addDynamicUpdateJob2Scheduler(Scheduler scheduler) {
        try {
            JobDetail jobDetail = generateDynamicUpdateJobDetail(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME);
            if(jobDetail == null){
                logger.error("create job failed!");
                return;
            }

            Trigger triger = generateTrigger(DYNAMIC_UPDATE_JOB_NAME, DYNAMIC_UPDATE_GROUP_NAME, DYNAMIC_UPDATE_CRONTINFO);
            if(triger == null){
                logger.error("create trigger failed!");
                return;
            }

            // 加载执行Job及定时器
            scheduler.scheduleJob(jobDetail,triger);
        } catch (SchedulerException e) {
            logger.error("create scheduler error, error message: "+e.toString());
        }
    }

    /**
     * 于信源信息生成对应的job
     * @param taskInfo
     * @return
     */
    public static JobDetail generateJobDetail(TaskInfo taskInfo) {
        String jobName = taskInfo.getSourceName();
        if(jobName.trim().length() == 0){
            logger.error("job name is empty, please check!");
            return null;
        }

        String jobGroup = taskInfo.getCategoryName();
        if(jobGroup.trim().length() == 0){
            logger.error("job group is empty, please check!");
            return null;
        }

        return JobBuilder.newJob(ScheduleJob.class)
                .withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup)
                .requestRecovery()
                .usingJobData(CONF_INFO, new Gson().toJson(taskInfo)).build();
    }

    public static JobDetail generateDynamicUpdateJobDetail(String jobName, String jobGroup) {
        if(jobName.trim().length() == 0){
            logger.error("job name is empty, please check!");
            return null;
        }

        if(jobGroup.trim().length() == 0){
            logger.error("job group is empty, please check!");
            return null;
        }

        return JobBuilder.newJob(DynamicUpdateJob.class)
                .withIdentity(PrefixType.JOB_PREFIX+jobName, PrefixType.JOB_PREFIX+jobGroup)
                .requestRecovery()
                .build();
    }

    /**
     * 基于信源信息生成对应的trigger
     * @param taskInfo
     * @return
     */
    public static Trigger generateTrigger(TaskInfo taskInfo) {
        String sourceTriggerName = taskInfo.getSourceName();
        if(sourceTriggerName.trim().length() == 0){
            logger.error("trigger name is empty, please check!");
            return null;
        }
        String sourceTriggerGroup = taskInfo.getCategoryName();
        if(sourceTriggerGroup.trim().length() == 0){
            logger.error("trigger group is empty, please check!");
            return null;
        }
        String cronInfo = taskInfo.getCronInfo();
        if(cronInfo.trim().length() == 0){
            logger.error("cron timer info is empty, please check!");
            return null;
        }
        return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName,
                PrefixType.TRIGGER_PREFIX+sourceTriggerGroup)
                .withSchedule(CronScheduleBuilder.cronSchedule(cronInfo))
                .build();
    }

    public static Trigger generateTrigger(String sourceTriggerName, String sourceTriggerGroup, String cronInfo) {
        if(sourceTriggerName.trim().length() == 0){
            logger.error("trigger name is empty, please check!");
            return null;
        }
        if(sourceTriggerGroup.trim().length() == 0){
            logger.error("trigger group is empty, please check!");
            return null;
        }
        if(cronInfo.trim().length() == 0){
            logger.error("cron timer info is empty, please check!");
            return null;
        }
        return TriggerBuilder.newTrigger().withIdentity(PrefixType.TRIGGER_PREFIX+sourceTriggerName,
                PrefixType.TRIGGER_PREFIX+sourceTriggerGroup)
                .withSchedule(CronScheduleBuilder.cronSchedule(cronInfo))
                .build();
    }
}

3.6 动态检测任务更新的Job

@DisallowConcurrentExecution
public class DynamicUpdateJob implements Job{

    private static Logger logger = LoggerFactory.getLogger(DynamicUpdateJob.class);

    public DynamicUpdateJob(){}

    @Override
    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        JobDetail jobDetail = jobExecutionContext.getJobDetail();

        JSONObject json = new JSONObject();
        json.put("jobName", jobDetail.getKey().getName());
        json.put("jobGroup", jobDetail.getKey().getGroup());
        json.put("triggerName", jobExecutionContext.getTrigger().getKey().getName());
        json.put("triggerGroup", jobExecutionContext.getTrigger().getKey().getGroup());

        logger.info("job is running: "+json.toString());

        // 获取当前的调度器
        Scheduler scheduler = jobExecutionContext.getScheduler();

        // 获取配置信息中的任务(注意需要保持)
        List<TaskInfo> confTaskInfos = GenerateTaskInfo.generateTaskInfoFromMysql();

        // 获取所有的job信息
        List<JobKey> schedulerJobKeys = acquireJobKeysWithinSceduler(scheduler);

        // 1. 配置任务不存在,而sheduler相关任务存在,则进行下线处理
        for(JobKey schedulerJobKey : schedulerJobKeys ){
            boolean hasSameJobKeyInConfTask = false;
            for(TaskInfo confTaskInfo : confTaskInfos){
                if(generateJobKey(confTaskInfo).equals(schedulerJobKey)){
                    hasSameJobKeyInConfTask = true;
                    break;
                }
            }
            if(!hasSameJobKeyInConfTask){
                try {
                    scheduler.deleteJob(schedulerJobKey);
                    logger.info("delete offline job: "+schedulerJobKey.toString());
                } catch (SchedulerException e) {
                    logger.error("delete offline job error: "+json.toString());
                }
            }
        }

        // 2 配置任务与调度器任务比较
        for(TaskInfo confTaskInfo : confTaskInfos){
            JobKey confJobKey = generateJobKey(confTaskInfo);

            boolean hasSameJob = false;
            for(JobKey schedulerJobKey : schedulerJobKeys ){
                if(confJobKey.equals(schedulerJobKey)){
                    hasSameJob = true;
                    break;
                }
            }

            if(hasSameJob){  //具有相同名称的job
                logger.info("has same jobKey: "+confJobKey);
                JobDetail schedulerJobDetail = null;
                try {
                    schedulerJobDetail = scheduler.getJobDetail(confJobKey);
                } catch (SchedulerException e) {
                    logger.error("get job detail from scheduler error: "+confJobKey);
                }
                if(schedulerJobDetail == null) continue;

                // 1) 是否需要下线
                if(!ScheduleJob.isNeedtoRun(confTaskInfo)){
                    try {
                        logger.info("has same jobKey and offline the job "+confJobKey);
                        scheduler.deleteJob(confJobKey);
                    } catch (SchedulerException e) {
                        logger.error("delete offline job error: "+confJobKey);
                    }
                }else{
                    // 2) 是否需要更新任务
                    TaskInfo schedulerTaskInfo = parseTaskInfoFromJobDataMap(schedulerJobDetail);
                    logger.info("confTaskInfo: " + confTaskInfo);
                    logger.info("schedulerTaskInfo: " + schedulerTaskInfo);
                    if(!confTaskInfo.equals(schedulerTaskInfo)){
                        try {
                            logger.info("has same jobKey and update the job "+confJobKey);
                            scheduler.deleteJob(confJobKey);
                            SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler);
                        } catch (SchedulerException e) {
                            logger.error("update scheduler info error: "+confJobKey);
                        }
                    }else{
                        logger.info("the job info is same "+confJobKey);
                    }
                }
            }else{ // 创建新的Job
                // 1) 是否满足上线的条件
                if(!ScheduleJob.isNeedtoRun(confTaskInfo)){
                    logger.info("the status is offline, no need to create new job: "+confJobKey);
                    continue;
                }

                logger.info("no same jobKey and create job "+confJobKey);

                // 2) 上线
                SchedulerFactory.addJob2Scheduler(confTaskInfo, scheduler);
            }
        }
    }

    protected List<JobKey> acquireJobKeysWithinSceduler(Scheduler scheduler){
        List<JobKey> jobKeys = Lists.newArrayList();
        try {
            for(String groupName : scheduler.getJobGroupNames()){
                if(groupName.equals(PrefixType.JOB_PREFIX+SchedulerFactory.DYNAMIC_UPDATE_GROUP_NAME)){
                    continue;
                }
                for(JobKey jobKey : scheduler.getJobKeys(GroupMatcher.jobGroupEquals(groupName))){
                    jobKeys.add(jobKey);
                }
            }
        } catch (SchedulerException e) {
        }
        return jobKeys;
    }

    protected  TaskInfo parseTaskInfoFromJobDataMap(JobDetail jobDetail){
        try {
            String confInfo = jobDetail.getJobDataMap().getString(SchedulerFactory.CONF_INFO);
            return new Gson().fromJson(confInfo, TaskInfo.class);
        } catch (Exception e) {
            logger.error("parse task info from JobDataMap error!");
            return null;
        }
    }

    protected JobKey generateJobKey(TaskInfo taskInfo){
        return generateJobKey(taskInfo.getSourceName(), taskInfo.getCategoryName());
    }

    protected JobKey generateJobKey(String jobName, String jobGroup){
        return JobKey.jobKey(PrefixType.JOB_PREFIX+jobName,PrefixType.JOB_PREFIX+jobGroup);
    }
}

3.7 Es数据库存储

/**
 * Created by songwang4 on 2017/6/7.
 */
public class DataOut2ES implements IDataOut, IDataClose {

    static Logger logger = LoggerFactory.getLogger(DataOut2ES.class);

    static TransportClient client;

    String indexName; // 默认为ocp

    String typeName;
    String sourceName;

    List<String> indexBuildEles;
    List<String> idBuilderEles;

    Map<String,String> outputType;

    String providerName;

    public DataOut2ES(String indexName,String type){
        this.indexName = indexName;
        this.typeName = type;
        init();
    }

    public DataOut2ES(String indexName,String type, List<String> indexBuildEles){
        this(indexName,type);
        this.indexBuildEles = indexBuildEles;
    }

    public DataOut2ES(String indexName,String type, List<String> idBuilderEles, List<String> indexBuildEles){
        this(indexName,type, indexBuildEles);
        this.idBuilderEles = idBuilderEles;
    }

    public DataOut2ES(String indexName,String type, String sourceName, List<String> idBuilderEles, List<String> indexBuildEles){
        this(indexName,type, idBuilderEles, indexBuildEles);
        this.sourceName = sourceName;
    }

    public DataOut2ES(String indexName,String type, String sourceName, List<String> idBuilderEles, List<String> indexBuildEles, Map<String,String> outputType,String providerName){
        this(indexName,type,sourceName, idBuilderEles, indexBuildEles);
        this.outputType = outputType;
        this.providerName = providerName;
    }

    public static void init() {
        if(client == null){
            Configuration conf = OcpConfHelper.getInstance().getOcpConf();

            String esClusterName = conf.getStringValue("ocp_es_cluster_name", "");
            String esIp = conf.getStringValue("ocp_es_ip", "");
            int esPort = conf.getIntValue("ocp_es_port", "");

            Settings settings = Settings.builder()
                    .put("cluster.name",esClusterName)
                    .put("client.transport.sniff", true)
                    .put("client.transport.ping_timeout", "120s")
                    .put("client.transport.nodes_sampler_interval","30s").build();
            try {
                client = new PreBuiltTransportClient(settings)
                        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esIp),esPort));
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 批量写入
     * @param datas
     */
    public void save(List<JSONObject> datas) {
        // 批量的插入数据
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        for(JSONObject data : datas){

            //按输出字段类型进行转换
//            data = OutputTypeTransform.transform(data,outputType);

            String id64 = IdBuilder.generateId(this.sourceName, data, this.idBuilderEles);
            if(id64.trim().length() == 0) continue;

            JSONObject indexJson = new JSONObject();
            for(String indexBuildEle : this.indexBuildEles){
                if(data.containsKey(indexBuildEle)){
                    indexJson.put(indexBuildEle, data.get(indexBuildEle));
                }
            }
            if(indexJson.keySet().isEmpty()){
                logger.info("no json fields, so no need to save");
                return;
            }

            bulkRequest.add(client.prepareIndex(indexName, typeName, id64).setSource(indexJson.toString()));
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if(bulkResponse.hasFailures()){
            logger.error("insert data 2 es error "+indexName);
            System.out.println(bulkResponse.buildFailureMessage());
        }
    }

    public void saveWithoutIndexBuilds(List<JSONObject> datas) {
        // 批量的插入数据
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        for(JSONObject data : datas){
            bulkRequest.add(client.prepareIndex(indexName, typeName).setSource(data.toString()));
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if(bulkResponse.hasFailures()){
            logger.error("insert data 2 es error "+indexName);
            System.out.println(bulkResponse.buildFailureMessage());
        }
    }

    public void saveWithoutIndexBuilds2(List<JSONObject> datas) {
        // 批量的插入数据
        BulkRequestBuilder bulkRequest = client.prepareBulk();
        for(JSONObject data : datas){
            String _id = data.getString("_id");
            JSONObject source = data.getJSONObject("_source");
            bulkRequest.add(client.prepareIndex(indexName, typeName,_id).setSource(source.toString()));
        }
        BulkResponse bulkResponse = bulkRequest.execute().actionGet();
        if(bulkResponse.hasFailures()){
            logger.error("insert data 2 es error "+indexName);
            System.out.println(bulkResponse.buildFailureMessage());
        }
    }

    /**
     * 判断索引是否存在
     * @param indexName
     * @return
     */
    public boolean isExistsIndex(String indexName){
        IndicesExistsResponse response = client.admin().indices()
                .exists(new IndicesExistsRequest().indices(new String[]{indexName})).actionGet();
        return response.isExists();
    }

    /**
     * 创建索引信息
     * @param indexName
     * @return
     */
    public boolean createIndex(String indexName){
        try {
            CreateIndexResponse indexResponse = this.client
                    .admin()
                    .indices()
                    .prepareCreate(indexName)
                    .get();
            return indexResponse.isAcknowledged();
        } catch (ElasticsearchException e) {
            e.printStackTrace();
        }
        return false;
    }

    @Override
    public void save(Object data) {
        if(this.indexBuildEles.size() == 0){
            logger.error("index fields are empty in es, no index need to save, info: " + data.toString());
            return;
        }

        // 逐条插入数据
        JSONObject json = null;
        try {
            json = (JSONObject)data;
        } catch (Exception e) {
            logger.error("trans data to json error in es :" + data.toString());
            return;
        }
        if(json == null){
            logger.error("trans data to Json error in es, info " + data.toString());
            return;
        }

//        json = OutputTypeTransform.transform(json,outputType);

        // 构建索引id
        String id64 = IdBuilder.generateId(this.sourceName, json, this.idBuilderEles);
        if(id64.trim().length() == 0){
            logger.error("generate 64 bit id is null,please check: " + data.toString());
            return;
        }

        JSONObject indexJson = new JSONObject();
        for(String indexBuildEle : this.indexBuildEles){
            if(json.containsKey(indexBuildEle)){
                indexJson.put(indexBuildEle, json.get(indexBuildEle));
            }
        }

        if(indexJson.keySet().isEmpty()){
            logger.info("no json fields, so no need to save");
            return;
        }

        logger.info("index info: "+indexJson);

        IndexResponse response = client.prepareIndex(this.indexName, this.typeName, id64).setSource(indexJson.toString()).get();
        if(response.status() != RestStatus.CREATED && response.status() != RestStatus.OK){
            logger.error("index error in es, status is "+response.status().getStatus()+"info: " + data.toString());
            return;
        }
    }

    @Override
    public void close() {
    }
}

  以上代码均为与Quartz相关的整体流程,虽然各个细节方面的代码,如配置类,数据库初始化类或加载类、以及部分帮助类没有展示,但对于Quartz的核心使用,已略窥一二。如有问题,可留言回复。

4. 集群模式

注意:上述默认使用Quartz集群模式,从主流程加载的quartz.properties中配置的集群模式如下,可进行参考。

#============================================================================
# Configure Main Scheduler Properties  
#============================================================================

org.quartz.scheduler.instanceName: OcpScheduler
org.quartz.scheduler.instanceId: OcpInstance

org.quartz.scheduler.skipUpdateCheck: true

#============================================================================
# Configure ThreadPool  
#============================================================================
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount: 50
org.quartz.threadPool.threadPriority: 5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true

#============================================================================
# Configure JobStore  
#============================================================================
org.quartz.jobStore.misfireThreshold: 120000

org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties: false
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.dataSource: ocpQzDs
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.clusterCheckinInterval = 60000 

#============================================================================
# Configure Datasources  
#============================================================================
org.quartz.dataSource.ocpQzDs.driver: com.mysql.jdbc.Driver
org.quartz.dataSource.ocpQzDs.URL:jdbc:mysql://192.168.1.1:3306/test?useUnicode=true&characterEncoding=utf-8
org.quartz.dataSource.ocpQzDs.user: test  
org.quartz.dataSource.ocpQzDs.password: test
org.quartz.dataSource.ocpQzDs.maxConnection: 30

#============================================================================
# Configure Plugins 
#============================================================================
org.quartz.plugin.shutdownHook.class: org.quartz.plugins.management.ShutdownHookPlugin
org.quartz.plugin.shutdownHook.cleanShutdown: true


#org.quartz.plugin.triggHistory.class: org.quartz.plugins.history.LoggingJobHistoryPlugin

 

相关标签: quartz

上一篇: h5+c3进阶(1)

下一篇: h5c3 3d部分