您现在的位置是: 首页  >  IT编程


程序员文章站 2022-05-13 23:46:06
解决方法:在执行的任务方法前加上Mutex特性即可,如果作业未完成,新作业开启的话,新作业会放入计划中的作业队列中,直到前面的作业完成。 必须使用Hangfire.Pro.Redis 和 Hangfire.SqlServer 作为数据库。 参考:https://github.com/Hangfire ......


必须使用hangfire.pro.redis 和 hangfire.sqlserver 作为数据库。


  public async task downloadvideo()



using system;
using system.collections.generic;
using system.linq;
using hangfire.common;
using hangfire.states;
using hangfire.storage;

namespace hangfire.pro
    /// <summary>
    /// represents a background job filter that helps to disable concurrent execution
    /// without causing worker to wait as in <see cref="hangfire.disableconcurrentexecutionattribute"/>.
    /// </summary>
    public class mutexattribute : jobfilterattribute, ielectstatefilter, iapplystatefilter
        private static readonly timespan distributedlocktimeout = timespan.fromminutes(1);

        private readonly string _resource;

        public mutexattribute(string resource)
            _resource = resource;
            retryinseconds = 15;

        public int retryinseconds { get; set; }
        public int maxattempts { get; set; }

        public void onstateelection(electstatecontext context)
            // we are intercepting transitions to the processed state, that is performed by
            // a worker just before processing a job. during the state election phase we can
            // change the target state to another one, causing a worker not to process the
            // backgorund job.
            if (context.candidatestate.name != processingstate.statename ||
                context.backgroundjob.job == null)

            // this filter requires an extended set of storage operations. it's supported
            // by all the official storages, and many of the community-based ones.
            var storageconnection = context.connection as jobstorageconnection;
            if (storageconnection == null)
                throw new notsupportedexception("this version of storage doesn't support extended methods. please try to update to the latest version.");

            string blockedby;

                // distributed lock is needed here only to prevent a race condition, when another 
                // worker picks up a background job with the same resource between get and set 
                // operations.
                // there will be no race condition, when two or more workers pick up background job
                // with the same id, because state transitions are protected with distributed lock
                // themselves.
                using (acquiredistributedsetlock(context.connection, context.backgroundjob.job.args))
                    // resource set contains a background job id that acquired a mutex for the resource.
                    // we are getting only one element to see what background job blocked the invocation.
                    var range = storageconnection.getrangefromset(

                    blockedby = range.count > 0 ? range[0] : null;

                    // we should permit an invocation only when the set is empty, or if current background
                    // job is already owns a resource. this may happen, when the localtransaction succeeded,
                    // but outer transaction was failed.
                    if (blockedby == null || blockedby == context.backgroundjob.id)
                        // we need to commit the changes inside a distributed lock, otherwise it's 
                        // useless. so we create a local transaction instead of using the 
                        // context.transaction property.
                        var localtransaction = context.connection.createwritetransaction();

                        // add the current background job identifier to a resource set. this means
                        // that resource is owned by the current background job. identifier will be
                        // removed only on failed state, or in one of final states (succeeded or
                        // deleted).
                        localtransaction.addtoset(getresourcekey(context.backgroundjob.job.args), context.backgroundjob.id);

                        // invocation is permitted, and we did all the required things.
            catch (distributedlocktimeoutexception)
                // we weren't able to acquire a distributed lock within a specified window. this may
                // be caused by network delays, storage outages or abandoned locks in some storages.
                // since it is required to expire abandoned locks after some time, we can simply
                // postpone the invocation.
                context.candidatestate = new scheduledstate(timespan.fromseconds(retryinseconds))
                    reason = "couldn't acquire a distributed lock for mutex: timeout exceeded"


            // background job execution is blocked. we should change the target state either to 
            // the scheduled or to the deleted one, depending on current retry attempt number.
            var currentattempt = context.getjobparameter<int>("mutexattempt") + 1;
            context.setjobparameter("mutexattempt", currentattempt);

            context.candidatestate = maxattempts == 0 || currentattempt <= maxattempts
                ? createscheduledstate(blockedby, currentattempt)
                : createdeletedstate(blockedby);

        public void onstateapplied(applystatecontext context, iwriteonlytransaction transaction)
            if (context.backgroundjob.job == null) return;

            if (context.oldstatename == processingstate.statename)
                using (acquiredistributedsetlock(context.connection, context.backgroundjob.job.args))
                    var localtransaction = context.connection.createwritetransaction();
                    localtransaction.removefromset(getresourcekey(context.backgroundjob.job.args), context.backgroundjob.id);


        public void onstateunapplied(applystatecontext context, iwriteonlytransaction transaction)

        private static deletedstate createdeletedstate(string blockedby)
            return new deletedstate
                reason = $"execution was blocked by background job {blockedby}, all attempts exhausted"

        private istate createscheduledstate(string blockedby, int currentattempt)
            var reason = $"execution is blocked by background job {blockedby}, retry attempt: {currentattempt}";

            if (maxattempts > 0)
                reason += $"/{maxattempts}";

            return new scheduledstate(timespan.fromseconds(retryinseconds))
                reason = reason

        private idisposable acquiredistributedsetlock(istorageconnection connection, ienumerable<object> args)
            return connection.acquiredistributedlock(getdistributedlockkey(args), distributedlocktimeout);

        private string getdistributedlockkey(ienumerable<object> args)
            return $"extension:job-mutex:lock:{getkeyformat(args, _resource)}";

        private string getresourcekey(ienumerable<object> args)
            return $"extension:job-mutex:set:{getkeyformat(args, _resource)}";

        private static string getkeyformat(ienumerable<object> args, string keyformat)
            return string.format(keyformat, args.toarray());