欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

详解免费开源的DotNet任务调度组件Quartz.NET(.NET组件介绍之五)

程序员文章站 2022-06-03 15:06:44
很多的软件项目中都会使用到定时任务、定时轮询数据库同步,定时邮件通知等功能。.net framework具有“内置”定时器功能,通过system.timers.timer类...

很多的软件项目中都会使用到定时任务、定时轮询数据库同步,定时邮件通知等功能。.net framework具有“内置”定时器功能,通过system.timers.timer类。在使用timer类需要面对的问题:计时器没有持久化机制;计时器具有不灵活的计划(仅能设置开始时间和重复间隔,没有基于日期,时间等);计时器不使用线程池(每个定时器一个线程);计时器没有真正的管理方案 - 你必须编写自己的机制,以便能够记住,组织和检索任务的名称等。

如果需要在.net实现定时器的功能,可以尝试使用以下这款开源免费的组件quartz.net组件。目前quartz.net版本为3.0,修改了原来的一些问题:修复由于线程本地存储而不能与adojobstore协同工作的调度器信令;线程局部状态完全删除;quartz.serializer.type是必需的,即使非序列化ramjobstore正在使用;json序列化错误地称为序列化回调。

一.quart.net概述: 

quartz是一个作业调度系统,可以与任何其他软件系统集成或一起使用。作业调度程序是一个系统,负责在执行预处理程序时执行(或通知)其他软件组件 - 确定(调度)时间到达。quartz是非常灵活的,并且包含多个使用范例,可以单独使用或一起使用,以实现您所需的行为,并使您能够以您的项目看起来最“自然”的方式编写代码。组件的使用非常轻便,并且需要非常少的设置/配置 - 如果您的需求相对基础,它实际上可以使用“开箱即用”。quartz是容错的,并且可以在系统重新启动之间保留(记住)您的预定作业。尽管quartz对于在给定的时间表上简单地运行某些系统进程非常有用,但当您学习如何使用quartz来驱动应用程序的业务流程时,quartz的全部潜能可以实现。

 quartz是作为一个小的动态链接库(.dll文件)分发的,它包含所有的核心quartz功能。 此功能的主要接口(api)是调度程序接口。 它提供简单的操作,如调度/非调度作业,启动/停止/暂停调度程序。如果你想安排你自己的软件组件执行,他们必须实现简单的job接口,它包含方法execute()。 如果希望在计划的触发时间到达时通知组件,则组件应实现triggerlistener或joblistener接口。主要的quartz'进程'可以在您自己的应用程序或独立应用程序(使用远程接口)中启动和运行。

二.quartz.net主体类和方法解析:

1.stdschedulerfactory类:创建quartzscheduler实例。

  /// <summary>
    /// 返回此工厂生成的调度程序的句柄。
    /// </summary>
    /// <remarks>
    ///如果<see cref =“initialize()”/>方法之一没有先前调用,然后是默认(no-arg)<see cref =“initialize()”/>方法将被这个方法调用。
    /// </remarks>
    public virtual ischeduler getscheduler()
    {
      if (cfg == null)
      {
        initialize();
      }

      schedulerrepository schedrep = schedulerrepository.instance;

      ischeduler sched = schedrep.lookup(schedulername);

      if (sched != null)
      {
        if (sched.isshutdown)
        {
          schedrep.remove(schedulername);
        }
        else
        {
          return sched;
        }
      }

      sched = instantiate();

      return sched;
    }
public interface ischedulerfactory
  {
    /// <summary>
    /// returns handles to all known schedulers (made by any schedulerfactory
    /// within this app domain.).
    /// </summary>
    icollection<ischeduler> allschedulers { get; }

    /// <summary>
    /// returns a client-usable handle to a <see cref="ischeduler" />.
    /// </summary>
    ischeduler getscheduler();

    /// <summary>
    /// returns a handle to the scheduler with the given name, if it exists.
    /// </summary>
    ischeduler getscheduler(string schedname);
  }

2.jobdetailimpl:传递给定作业实例的详细信息属性。

/// <summary>
    /// 获取或设置与<see cref =“ijob”/>相关联的<see cref =“jobdatamap”/>。
    /// </summary>
    public virtual jobdatamap jobdatamap
    {
      get
      {
        if (jobdatamap == null)
        {
          jobdatamap = new jobdatamap();
        }
        return jobdatamap;
      }

      set { jobdatamap = value; }
    }

3.jobkey:键由名称和组组成,名称必须是唯一的,在组内。 如果只指定一个组,则默认组将使用名称。

  [serializable]
  public sealed class jobkey : key<jobkey>
  {
    public jobkey(string name) : base(name, null)
    {
    }

    public jobkey(string name, string group) : base(name, group)
    {
    }

    public static jobkey create(string name)
    {
      return new jobkey(name, null);
    }

    public static jobkey create(string name, string group)
    {
      return new jobkey(name, group);
    }
  }

4.stdschedulerfactory.initialize():

/// <summary> 
    /// 使用初始化<see cref =“ischedulerfactory”/>
     ///给定键值集合对象的内容。
    /// </summary>
    public virtual void initialize(namevaluecollection props)
    {
      cfg = new propertiesparser(props);
      validateconfiguration();
    }

    protected virtual void validateconfiguration()
    {
      if (!cfg.getbooleanproperty(propertycheckconfiguration, true))
      {
        // should not validate
        return;
      }

      // determine currently supported configuration keys via reflection
      list<string> supportedkeys = new list<string>();
      list<fieldinfo> fields = new list<fieldinfo>(gettype().getfields(bindingflags.static | bindingflags.public | bindingflags.flattenhierarchy));
      // choose constant string fields
      fields = fields.findall(field => field.fieldtype == typeof (string));

      // read value from each field
      foreach (fieldinfo field in fields)
      {
        string value = (string) field.getvalue(null);
        if (value != null && value.startswith(configurationkeyprefix) && value != configurationkeyprefix)
        {
          supportedkeys.add(value);
        }
      }

      // now check against allowed
      foreach (string configurationkey in cfg.underlyingproperties.allkeys)
      {
        if (!configurationkey.startswith(configurationkeyprefix) || configurationkey.startswith(configurationkeyprefixserver))
        {
          // don't bother if truly unknown property
          continue;
        }

        bool ismatch = false;
        foreach (string supportedkey in supportedkeys)
        {
          if (configurationkey.startswith(supportedkey, stringcomparison.invariantculture))
          {
            ismatch = true;
            break;
          }
        }
        if (!ismatch)
        {
          throw new schedulerconfigexception("unknown configuration property '" + configurationkey + "'");
        }
      }

    }

三.quartz.net的基本应用:

 下面提供一些较为通用的任务处理代码:

1.任务处理帮助类:

 /// <summary>
  /// 任务处理帮助类
  /// </summary>
  public class quartzhelper
  {
    public quartzhelper() { }

    public quartzhelper(string quartzserver, string quartzport)
    {
      server = quartzserver;
      port = quartzport;
    }

    /// <summary>
    /// 锁对象
    /// </summary>
    private static readonly object obj = new object();

    /// <summary>
    /// 方案
    /// </summary>
    private const string scheme = "tcp";

    /// <summary>
    /// 服务器的地址
    /// </summary>
    public static string server { get; set; }

    /// <summary>
    /// 服务器的端口
    /// </summary>
    public static string port { get; set; }

    /// <summary>
    /// 缓存任务所在程序集信息
    /// </summary>
    private static readonly dictionary<string, assembly> assemblydict = new dictionary<string, assembly>();

    /// <summary>
    /// 程序调度
    /// </summary>
    private static ischeduler _scheduler;

    /// <summary>
    /// 初始化任务调度对象
    /// </summary>
    public static void initscheduler()
    {
      try
      {
        lock (obj)
        {
          if (_scheduler != null) return;
          //配置文件的方式,配置quartz实例
          ischedulerfactory schedulerfactory = new stdschedulerfactory();
          _scheduler = schedulerfactory.getscheduler();
        }
      }
      catch (exception ex)
      {
        throw new exception(ex.message);
      }
    }

    /// <summary>
    /// 启用任务调度
    /// 启动调度时会把任务表中状态为“执行中”的任务加入到任务调度队列中
    /// </summary>
    public static void startscheduler()
    {
      try
      {
        if (_scheduler.isstarted) return;
        //添加全局监听
        _scheduler.listenermanager.addtriggerlistener(new customtriggerlistener(), groupmatcher<triggerkey>.anygroup());
        _scheduler.start();

        //获取所有执行中的任务
        list<taskmodel> listtask = taskhelper.getalltasklist().tolist();

        if (listtask.count > 0)
        {
          foreach (taskmodel taskutil in listtask)
          {
            try
            {
              schedulejob(taskutil);
            }
            catch (exception e)
            {
             throw new exception(taskutil.taskname,e);
            }
          }
        }       
      }
      catch (exception ex)
      {
        throw new exception(ex.message);
      }
    }

    /// <summary>
    /// 启用任务
    /// <param name="task">任务信息</param>
    /// <param name="isdeleteoldtask">是否删除原有任务</param>
    /// <returns>返回任务trigger</returns>
    /// </summary>
    public static void schedulejob(taskmodel task, bool isdeleteoldtask = false)
    {
      if (isdeleteoldtask)
      {
        //先删除现有已存在任务
        deletejob(task.taskid.tostring());
      }
      //验证是否正确的cron表达式
      if (validexpression(task.cronexpressionstring))
      {
        ijobdetail job = new jobdetailimpl(task.taskid.tostring(), getclassinfo(task.assemblyname, task.classname));
        //添加任务执行参数
        job.jobdatamap.add("taskparam", task.taskparam);

        crontriggerimpl trigger = new crontriggerimpl
        {
          cronexpressionstring = task.cronexpressionstring,
          name = task.taskid.tostring(),
          description = task.taskname
        };
        _scheduler.schedulejob(job, trigger);
        if (task.status == taskstatus.stop)
        {
          jobkey jk = new jobkey(task.taskid.tostring());
          _scheduler.pausejob(jk);
        }
        else
        {
          list<datetime> list = getnextfiretime(task.cronexpressionstring, 5);
          foreach (var time in list)
          {
            loghelper.writelog(time.tostring(cultureinfo.invariantculture));
          }
        }
      }
      else
      {
        throw new exception(task.cronexpressionstring + "不是正确的cron表达式,无法启动该任务!");
      }
    }


    /// <summary>
    /// 初始化 远程quartz服务器中的,各个scheduler实例。
    /// 提供给远程管理端的后台,用户获取scheduler实例的信息。
    /// </summary>
    public static void initremotescheduler()
    {
      try
      {
        namevaluecollection properties = new namevaluecollection
        {
          ["quartz.scheduler.instancename"] = "examplequartzscheduler",
          ["quartz.scheduler.proxy"] = "true",
          ["quartz.scheduler.proxy.address"] =string.format("{0}://{1}:{2}/quartzscheduler", scheme, server, port)
        };

        ischedulerfactory sf = new stdschedulerfactory(properties);

        _scheduler = sf.getscheduler();
      }
      catch (exception ex)
      {
        throw new exception(ex.stacktrace);
      }
    }

    /// <summary>
    /// 删除现有任务
    /// </summary>
    /// <param name="jobkey"></param>
    public static void deletejob(string jobkey)
    {
      try
      {
        jobkey jk = new jobkey(jobkey);
        if (_scheduler.checkexists(jk))
        {
          //任务已经存在则删除
          _scheduler.deletejob(jk);
          
        }
      }
      catch (exception ex)
      {
        throw new exception(ex.message);
      }
    }

   

    /// <summary>
    /// 暂停任务
    /// </summary>
    /// <param name="jobkey"></param>
    public static void pausejob(string jobkey)
    {
      try
      {
        jobkey jk = new jobkey(jobkey);
        if (_scheduler.checkexists(jk))
        {
          //任务已经存在则暂停任务
          _scheduler.pausejob(jk);
        }
      }
      catch (exception ex)
      {
        throw new exception(ex.message);
      }
    }

    /// <summary>
    /// 恢复运行暂停的任务
    /// </summary>
    /// <param name="jobkey">任务key</param>
    public static void resumejob(string jobkey)
    {
      try
      {
        jobkey jk = new jobkey(jobkey);
        if (_scheduler.checkexists(jk))
        {
          //任务已经存在则暂停任务
          _scheduler.resumejob(jk);
        }
      }
      catch (exception ex)
      {
       throw new exception(ex.message);
      }
    }

    /// <summary> 
    /// 获取类的属性、方法 
    /// </summary> 
    /// <param name="assemblyname">程序集</param> 
    /// <param name="classname">类名</param> 
    private static type getclassinfo(string assemblyname, string classname)
    {
      try
      {
        assemblyname = filehelper.getabsolutepath(assemblyname + ".dll");
        assembly assembly = null;
        if (!assemblydict.trygetvalue(assemblyname, out assembly))
        {
          assembly = assembly.loadfrom(assemblyname);
          assemblydict[assemblyname] = assembly;
        }
        type type = assembly.gettype(classname, true, true);
        return type;
      }
      catch (exception ex)
      {
        throw new exception(ex.message);
      }
    }

    /// <summary>
    /// 停止任务调度
    /// </summary>
    public static void stopschedule()
    {
      try
      {
        //判断调度是否已经关闭
        if (!_scheduler.isshutdown)
        {
          //等待任务运行完成
          _scheduler.shutdown(true);
        }
      }
      catch (exception ex)
      {
        throw new exception(ex.message);
      }
    }

    /// <summary>
    /// 校验字符串是否为正确的cron表达式
    /// </summary>
    /// <param name="cronexpression">带校验表达式</param>
    /// <returns></returns>
    public static bool validexpression(string cronexpression)
    {
      return cronexpression.isvalidexpression(cronexpression);
    }

    /// <summary>
    /// 获取任务在未来周期内哪些时间会运行
    /// </summary>
    /// <param name="cronexpressionstring">cron表达式</param>
    /// <param name="numtimes">运行次数</param>
    /// <returns>运行时间段</returns>
    public static list<datetime> getnextfiretime(string cronexpressionstring, int numtimes)
    {
      if (numtimes < 0)
      {
        throw new exception("参数numtimes值大于等于0");
      }
      //时间表达式
      itrigger trigger = triggerbuilder.create().withcronschedule(cronexpressionstring).build();
      ilist<datetimeoffset> dates = triggerutils.computefiretimes(trigger as ioperabletrigger, null, numtimes);
      list<datetime> list = new list<datetime>();
      foreach (datetimeoffset dtf in dates)
      {
        list.add(timezoneinfo.converttimefromutc(dtf.datetime, timezoneinfo.local));
      }
      return list;
    }


    public static object currenttasklist()
    {
      throw new notimplementedexception();
    }

    /// <summary>
    /// 获取当前执行的task 对象
    /// </summary>
    /// <param name="context"></param>
    /// <returns></returns>
    public static taskmodel gettaskdetail(ijobexecutioncontext context)
    {
      taskmodel task = new taskmodel();

      if (context != null)
      {

        task.taskid = guid.parse(context.trigger.key.name);
        task.taskname = context.trigger.description;
        task.recentruntime = datetime.now;
        task.taskparam = context.jobdetail.jobdatamap.get("taskparam") != null ? context.jobdetail.jobdatamap.get("taskparam").tostring() : "";
      }
      return task;
    }
  }

2.设置执行中的任务:

public class taskbll
  {
    private readonly taskdal _dal = new taskdal();

    /// <summary>
    /// 获取任务列表
    /// </summary>
    /// <param name="pageindex"></param>
    /// <param name="pagesize"></param>
    /// <returns></returns>
    public pageof<taskmodel> gettasklist(int pageindex, int pagesize)
    {
      return _dal.gettasklist(pageindex, pagesize);
    }

    /// <summary>
    /// 读取数据库中全部的任务
    /// </summary>
    /// <returns></returns>
    public list<taskmodel> getalltasklist()
    {
      return _dal.getalltasklist();
    }

    /// <summary>
    /// 
    /// </summary>
    /// <param name="taskid"></param>
    /// <returns></returns>
    public taskmodel getbyid(string taskid)
    {
      throw new notimplementedexception();
    }

    /// <summary>
    /// 删除任务
    /// </summary>
    /// <param name="taskid"></param>
    /// <returns></returns>
    public bool deletebyid(string taskid)
    {
      return _dal.updatetaskstatus(taskid, -1);
    }

    /// <summary>
    /// 修改任务
    /// </summary>
    /// <param name="taskid"></param>
    /// <param name="status"></param>
    /// <returns></returns>
    public bool updatetaskstatus(string taskid, int status)
    {
      return _dal.updatetaskstatus(taskid, status);
    }

    /// <summary>
    /// 修改任务的下次启动时间
    /// </summary>
    /// <param name="taskid"></param>
    /// <param name="nextfiretime"></param>
    /// <returns></returns>
    public bool updatenextfiretime(string taskid, datetime nextfiretime)
    {
      return _dal.updatenextfiretime(taskid, nextfiretime);
    }

    /// <summary>
    /// 根据任务id 修改 上次运行时间
    /// </summary>
    /// <param name="taskid"></param>
    /// <param name="recentruntime"></param>
    /// <returns></returns>
    public bool updaterecentruntime(string taskid, datetime recentruntime)
    {
      return _dal.updaterecentruntime(taskid, recentruntime);
    }

    /// <summary>
    /// 根据任务id 获取任务
    /// </summary>
    /// <param name="taskid"></param>
    /// <returns></returns>
    public taskmodel gettaskbyid(string taskid)
    {
      return _dal.gettaskbyid(taskid);
    }

    /// <summary>
    /// 添加任务
    /// </summary>
    /// <param name="task"></param>
    /// <returns></returns>
    public bool add(taskmodel task)
    {
      return _dal.add(task);
    }

    /// <summary>
    /// 修改任务
    /// </summary>
    /// <param name="task"></param>
    /// <returns></returns>
    public bool edit(taskmodel task)
    {
      return _dal.edit(task);
    }
  }

3.任务实体:

/// <summary>
  /// 任务实体
  /// </summary>
  public class taskmodel
  {
    /// <summary>
    /// 任务id
    /// </summary>
    public guid taskid { get; set; }

    /// <summary>
    /// 任务名称
    /// </summary>
    public string taskname { get; set; }

    /// <summary>
    /// 任务执行参数
    /// </summary>
    public string taskparam { get; set; }

    /// <summary>
    /// 运行频率设置
    /// </summary>
    public string cronexpressionstring { get; set; }

    /// <summary>
    /// 任务运频率中文说明
    /// </summary>
    public string cronremark { get; set; }

    /// <summary>
    /// 任务所在dll对应的程序集名称
    /// </summary>
    public string assemblyname { get; set; }

    /// <summary>
    /// 任务所在类
    /// </summary>
    public string classname { get; set; }

    public taskstatus status { get; set; }

    /// <summary>
    /// 任务创建时间
    /// </summary>
    public datetime? createdtime { get; set; }

    /// <summary>
    /// 任务修改时间
    /// </summary>
    public datetime? modifytime { get; set; }

    /// <summary>
    /// 任务最近运行时间
    /// </summary>
    public datetime? recentruntime { get; set; }

    /// <summary>
    /// 任务下次运行时间
    /// </summary>
    public datetime? nextfiretime { get; set; }

    /// <summary>
    /// 任务备注
    /// </summary>
    public string remark { get; set; }

    /// <summary>
    /// 是否删除
    /// </summary>
    public int isdelete { get; set; }
  }

 4.配置文件:

 # you can configure your scheduler in either <quartz> configuration section
# or in quartz properties file
# configuration section has precedence

quartz.scheduler.instancename = examplequartzscheduler

# configure thread pool info
quartz.threadpool.type = quartz.simpl.simplethreadpool, quartz
quartz.threadpool.threadcount = 10
quartz.threadpool.threadpriority = normal

# job initialization plugin handles our xml reading, without it defaults are used
# quartz.plugin.xml.type = quartz.plugin.xml.xmlschedulingdataprocessorplugin, quartz
# quartz.plugin.xml.filenames = ~/quartz_jobs.xml

# export this server to remoting context
quartz.scheduler.exporter.type = quartz.simpl.remotingschedulerexporter, quartz
quartz.scheduler.exporter.port = 555
quartz.scheduler.exporter.bindname = quartzscheduler
quartz.scheduler.exporter.channeltype = tcp
quartz.scheduler.exporter.channelname = httpquartz

四.总结:

 在项目中比较多的使用到定时任务的功能,今天的介绍的组件可以很好的完成一些定时任务的要求。这篇文章主要是作为引子,简单的介绍了组件的背景和组件的使用方式,如果项目中需要使用,可以进行更加深入的了解。