欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java实现的redis分布式锁

程序员文章站 2022-07-12 16:16:19
...

业务场景多了,就应该把场景代码工具化,减少重复代码;趁周六在这里总结一下java实现的redis分布式锁代码;

 

使用的技术点:

1、redis函数setnx;

2、redis监控函数watch;

3、String.intern在jvm的内存操作,和String.intern的替代方案:guava;

4、@FunctionalInterface函数接口和lambda表达式的应用。

5、RedisTemplate;尽量使用RedisTemplate操作redis;直接操作jedispool人多的时候会乱。

 

 

reidis锁操作类

 

package com.framework.cache;

import java.util.Random;

import com.framework.cache.execption.RedisExecption;

/**
 * redis锁操作类
 * @author lyq
 *
 */
public class RedisLock {
	public static final String LOCKED = "TRUE";
	public static final long MILLI_NANO_CONVERSION = 1000000L;
	public static final long DEFAULT_TIME_OUT = 1000L;
	public static final Random RANDOM = new Random();
	public static final int EXPIRE = 1200;
	private String key;
	private String lockId;

	private boolean locked = false;

	public RedisLock(String key) {
		lockId = System.currentTimeMillis()+"";
		this.key = String.format("lock:%s", key);
	}



	/**
	 *  加锁
	 * @param expireSeconds 超时时间,单位:秒
	 * @return 获取锁成功返回true;超时返回false;其它报异常
	 */
	public boolean lock(int expireSeconds) {

		long nano = System.nanoTime();
		long timeout=  expireSeconds * 1000000000L;
		try {
			// 当前任务超时的时候,可能会被其它任务抢到锁,该任务会误删其它任务的锁;所以加入判断。
			if (RedisHelper.getExpire(this.key) == -1) {
				RedisHelper.expire(this.key, expireSeconds);
			}
			while (System.nanoTime() - nano < timeout) {
				if (RedisHelper.setnx(this.key, lockId)) {
					RedisHelper.expire(this.key, expireSeconds);
					this.locked = true;
					return this.locked;
				}
				Thread.sleep(3L, RANDOM.nextInt(500));
			}
		} catch (Exception e) {
			throw new RedisExecption("Locking error", e);
		}
		return false;
	}

	public boolean lock() {
		return lock(2);
	}

	/**
	 * 解锁
	 */
	@SuppressWarnings("unchecked")
	public void unlock() {
		if (this.locked) {
			String _lockId = this.lockId;
			
			RedisHelper.getRedisTemplate().execute(connection->{
				byte[] keyByte = RedisHelper.serialize(key);
				//开启watch之后,如果key的值被修改,则事务失败,exec方法返回null
				connection.watch(keyByte);
				String startTime = RedisHelper.unserialize(connection.get(keyByte), String.class);
				if (_lockId.equals(startTime)) { // 当前任务超时的时候,可能会被其它任务抢到锁,该任务会误删其它任务的锁;所以加入判断。
					connection.multi();
					connection.del(keyByte);
					connection.exec();
				}
				connection.unwatch();
				return true;
			},true);
			
		}
	}
	
	public String getLockKey(){
		return this.key;
	}

}

 

 

 

 

redis对外帮助类

 

package com.framework.cache;

import java.lang.reflect.Type;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

import com.framework.cache.execption.RedisExecption;
import com.framework.cache.execption.RedisLockTimeException;
import com.framework.cache.lock.LockCallback;
import com.framework.utils.NumUtil;
import com.google.common.collect.Interner;
import com.google.common.collect.Interners;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import redis.clients.jedis.JedisPoolConfig;

/**
 * redis对外静态帮助类
 * @author lyq
 *
 */
public class RedisHelper {
	public static RedisTemplate redisTemplate = null;

	private static Interner<Object> pool = Interners.newWeakInterner();

	private static final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd hh:mm:ss").create();

	private static final String IGNORE_EXEC_KEY = "_ignoreExec";

	/**
	 * 加锁执行
	 * @param callBack 回调函数
	 * @param lockKey 
	 * @param expireSeconds 执行超时时间,单位:秒
	 * @return 返回callback回调函数执行结果
	 */
	public static <T> T lockExec(LockCallback<T> callBack, String lockKey, Integer expireSeconds) {
		/**
		 * redisLock其实就是规定时间内一直循环网络请求redis资源;
		 * 为了防止并发的时候,太多线程循环抢redis锁;导致资源不足,需要对字符串加锁;
		 * 字符加锁,需要使用intern;而1.7以后,String.intern的对象会一直存在jvm内存里面,大量使用会导致内存异常;替代方案就是使用guava的Interners.newWeakInterner()代替String.intern;
		 * synchronized不支持过期,如果需要加过期时间,则使用ReentrantLock 的 tryLock(long timeout, TimeUnit unit)方法。
                 */
		synchronized (pool.intern(lockKey)) {
			RedisLock lock = new RedisLock(lockKey);
			try {
				if(lock.lock(expireSeconds)){
					return callBack.exec();
				}else{
					throw new RedisLockTimeException(lockKey,expireSeconds);
				}
			} catch (Exception e) {
				throw new RedisExecption(e);
			} finally {
				lock.unlock();
			}
		}
	}



	
	/**
	 * 获取超时时间
	 * @param key 
	 * @return 单位秒
	 */
	public static Long getExpire(String key){
		byte[] rawKey = RedisHelper.serialize(key);
		return (Long) redisTemplate.execute(connection -> connection.ttl(rawKey), true);
	}
	
	/**
	 * 设置超时时间
	 * @param key
	 * @param seconds 单位秒
	 * @return
	 */
	public static Boolean expire(String key,Integer seconds){
		byte[] rawKey = RedisHelper.serialize(key);
		return (Boolean) redisTemplate.execute(connection -> connection.expire(rawKey,seconds), true);
	}
	


	/**
	 * 尝试加锁并执行操作 如果加锁失败直接返回 如果加锁成功,则指定锁时间
	 *
	 * @param action
	 *            待执行的动作
	 * @param key
	 *            加锁的key
	 * @param expireSeconds
	 *            锁超时时间,单位秒
	 * @param <T>
	 *            泛型参数
	 * @return
	 */
	public static <T> T mutexExec(LockCallback<T> action, String key, int expireSeconds) {

		if (setnx(key + IGNORE_EXEC_KEY, "TRUE", expireSeconds)) {
			try {
				return action.exec();
			} catch (Exception e) {
				throw new RedisExecption("ignoreExec error", e);
			} finally {
				del(key + IGNORE_EXEC_KEY);
			}
		}
		return null;
	}



/**
 * 设置值成功返回true,key已存在则设置值失败,返回false
 * @param key
 * @param value
 * @param expireSeconds,设置值过期时间,即使设置值失败,也会一直覆盖过期是时间
 * @return
 * @throws RedisExecption
 */
	@SuppressWarnings({ "unchecked" })
	public static boolean setnx(final String key, final Object value, final Integer expireSeconds)
			throws RedisExecption {
		if (StringUtils.isEmpty(key) || value == null) {
			throw new RedisExecption("key或value不能为空");
		}
		return (Boolean) redisTemplate.execute(connection-> {
				try {
					Boolean result = false;
					connection.multi();
					setnx(key, value, expireSeconds, connection);
					List<Object> list = connection.exec();
					if (CollectionUtils.isNotEmpty(list)) {
						result = (Boolean) list.get(0);
					}
					return result;
				} catch (Exception e) {
					// connection.discard();
					throw new RuntimeException(e);
				}
			

		}, true);
	}

	/**
	 * 设置值成功返回true,key已存在则设置值失败,返回false
	 * @param key
	 * @param value
	 * @return
	 * @throws RedisExecption
	 */
	@SuppressWarnings({ "unchecked" })
	public static boolean setnx(final String key, final Object value) throws RedisExecption {
		if (StringUtils.isEmpty(key) || value == null) {
			throw new RedisExecption("key或value不能为空");
		}
		return (Boolean) redisTemplate.execute(connection-> {
				try {
					Boolean result = false;
					connection.multi();
					 setnx(key, value, -1, connection);
					List<Object> list = connection.exec();
					if (CollectionUtils.isNotEmpty(list)) {
						result = (Boolean) list.get(0);
					}
					return result;
				} catch (Exception e) {
					// connection.discard();
					throw new RuntimeException(e);
				}

		}, true);
	}
	/**
	 * 事务执行,不返回
	 * @param key
	 * @param value
	 * @param expireSeconds
	 * @param connection
	 */
	public static void setnx(final String key, final Object value, final Integer expireSeconds,
			final RedisConnection connection) {
		byte[] byteKey = serialize(key);

		connection.setNX(byteKey, serialize(value));
		if (NumUtil.intValue(expireSeconds) != -1) {
			connection.expire(byteKey, expireSeconds);
		}
	}

	/**
	 * 删除键
	 * @param key
	 * @return
	 * @throws RedisExecption
	 */
	@SuppressWarnings("unchecked")
	public static Long del(final String key) throws RedisExecption {
		return (Long) redisTemplate.execute(connection-> del(key, connection), true);
	}

	public static Long del(final String key, final RedisConnection connection) {
		try {
			return connection.del(serialize(key));
		} catch (Exception e) {
			throw new RuntimeException(e);
		}
	}

	/**
	 * 序列化
	 * @param value
	 * @return
	 */
	public static byte[] serialize(Object value) {
		String valStr = getJsonStr(value);
		return redisTemplate.getStringSerializer().serialize(valStr);
	}

	/**
	 * 返序列化
	 * @param bytes
	 * @param clz
	 * @return
	 */
	public static <T> T unserialize(byte[] bytes, Class<T> clz) {
		// String valStr = getJsonStr);

		Object json = redisTemplate.getStringSerializer().deserialize(bytes);
		if (json == null) {
			return null;
		}
		if (clz == null || clz.getName().equals("java.lang.String")) {
			return (T) json;
		} else {
			return gson.fromJson(json.toString(), clz);
		}
	}

	/**
	 * private static java.lang.reflect.Type imgType = new TypeToken<ArrayList
	 * <T>>() {}.getType();
	 *
	 * @param bytes
	 * @param type
	 * @return
	 */
	public static <T> T unserialize(byte[] bytes, Type type) {
		// String valStr = getJsonStr);

		Object json = redisTemplate.getStringSerializer().deserialize(bytes);
		if (json == null) {
			return null;
		}
		if (type == null) {
			return (T) json;
		} else {
			return gson.fromJson(json.toString(), type);
		}
	}



	private static String getJsonStr(Object value) {
		String valStr = null;
		if (value == null) {
			valStr = null;
		}

		if (value instanceof String) {
			valStr = value.toString();
		} else if (value instanceof Number) {
			valStr = value + "";
		} else {
			valStr = gson.toJson(value);
		}
		return valStr;
	}

	@SuppressWarnings("rawtypes")
	public static RedisTemplate getRedisTemplate() {
		return redisTemplate;
	}

	@SuppressWarnings({ "rawtypes", "static-access" })
	public void setRedisTemplate(RedisTemplate redisTemplate) {
		this.redisTemplate = redisTemplate;
	}
	

}

 

 

 

 

 

redis异常处理类

 

package com.framework.cache.execption;

import java.io.PrintWriter;
import java.io.StringWriter;

/**
 * redis异常类,redis执行异常报错
 * @author lyq
 *
 */
public class RedisExecption extends RuntimeException {
private static final long serialVersionUID = 155L;
//private Logger log =  Logger.getLogger(this.getClass());
	public RedisExecption() {
	}

	public RedisExecption(String message) {
		super(message);
	}

	public RedisExecption(Throwable cause) {
		super(cause);
	}

	public RedisExecption(String message, Throwable cause) {
		super(message, cause);
	}
	
	/** 
	  * 以字符串形式返回异常堆栈信息 
	  * @param e 
	  * @return 异常堆栈信息字符串 
	  */ 
	public static String getStackTrace(Exception e) { 
	  StringWriter writer = new StringWriter(); 
	  e.printStackTrace(new PrintWriter(writer,true)); 
	  return writer.toString(); 
	} 
}

 

 

分布式锁超时异常

 

package com.framework.cache.execption;

/**
 * 分布式锁超时报错
 * @author lyq
 *
 */
public class RedisLockTimeException extends RedisExecption {

	/**
	 * 超时报错
	 * @param expireSeconds 单位秒
	 */
	private static final long serialVersionUID = 1L;

	public RedisLockTimeException(String lockKey,Integer expireSeconds) {
		super(String.format("分布式锁 ‘%s’等待超时: %ss",lockKey,expireSeconds));
	}
}

 

 分布式锁测试类

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

import com.framework.cache.RedisHelper;

import redis.clients.jedis.JedisPoolConfig;

public class RedisTest {
	public static void main(String[] args) {
		RedisTemplate t = new RedisTemplate();

		// InputStream is =
		// RedisHelper.class.getClassLoader().getResourceAsStream("redis.properties");
		String hostname = "";
		String password = "";
		String port = "";
		// try {
		// prop.load(is);
		

		// } catch (IOException e) {
		// // TODO Auto-generated catch block
		// e.printStackTrace();
		// }

		/*** 热备start ***/
		// String sentinelHost = "localhost";
		// int sentinelport = 26379;
		// RedisSentinelConfiguration sentinelConfiguration = new
		// RedisSentinelConfiguration();
		// sentinelConfiguration.setMaster("mymaster");
		// sentinelConfiguration.sentinel(sentinelHost, sentinelport);
		// JedisConnectionFactory connectionFactory = new
		// JedisConnectionFactory(sentinelConfiguration);
		/*** 热备end ***/

		/*** 单机start ***/
		// hostname = "120.25.226.230";
		hostname = "127.0.0.1";
		password = "";
		
		port = "6379";
		JedisConnectionFactory connectionFactory = new JedisConnectionFactory();
		connectionFactory.setPassword(password);
		connectionFactory.setDatabase(5);
		connectionFactory.setHostName(hostname);
		connectionFactory.setPort(Integer.parseInt(port));
		/*** 单机end ***/
		JedisPoolConfig config = new redis.clients.jedis.JedisPoolConfig();
		config.setMaxTotal(5000);
		config.setMaxIdle(200);
		config.setMaxWaitMillis(5000);
		config.setTestOnBorrow(true);
		connectionFactory.afterPropertiesSet();
		t.setConnectionFactory(connectionFactory);
		connectionFactory.setPoolConfig(config);
		t.afterPropertiesSet();
		RedisHelper.redisTemplate = t;

	
		/************************************
		 * 分布式锁测试 start
		 *****************************************/

		ThreadPoolExecutor executor = null;
		// 有限队列,缓存30个请求等待,线程池默认策略,缓存超出上限报错
		BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(30);
		// 初始化30个线程,最多30个线程
		executor = new ThreadPoolExecutor(30, 30, 10, TimeUnit.SECONDS, workQueue, new ThreadFactory() {

			public Thread newThread(Runnable r) {
				Thread thread = new Thread(r);
				thread.setName("redis-lock-thread");
				return thread;
			}
		});

		
		CyclicBarrier barrier = new CyclicBarrier(30);
		for (Integer i = 0; i < 30; i++) {
			final int _i = i;
			executor.execute(() -> {
				try {
					barrier.await();
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
                        //锁等待10秒超时
				RedisHelper.lockExec(() -> {
					System.out.println(_i + "=====");
					try {
						Thread.sleep(2000);
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					return 2;
				}, "test",10);
			});

		}
		/************************************
		 * 分布式锁测试 end
		 *****************************************/

	
	}
}

 

 

相关标签: redis

上一篇: BigDecimalUtil

下一篇: kubeadm部署k8s