您现在的位置是: 首页

etcd分布式锁实现 博客分类: 分布式

程序员文章站 2024-03-22 20:39:22





import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.Lease;
import com.coreos.jetcd.Lock;
import com.coreos.jetcd.data.ByteSequence;

 * Etcd Java客户端 Jetcd提供的Lock客户端实现分布式锁
public class EtcdDistributedLock {
	private static EtcdDistributedLock lock = null;
	private static Object mutex = new Object();
	private Client client; // etcd客户端
	private Lock lockClient; // etcd分布式锁客户端
	private Lease leaseClient; // etcd租约客户端

	private EtcdDistributedLock() {
		// 创建Etcd客户端,本例中Etcd集群只有一个节点
		this.client = Client.builder().endpoints("http://localhost:2379").build();
		this.lockClient = client.getLockClient();
		this.leaseClient = client.getLeaseClient();

	 * 单例
	public static EtcdDistributedLock getInstance() {
		synchronized (mutex) { // 互斥锁
			if (null == lock) {
				lock = new EtcdDistributedLock();
		return lock;

	 * 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。
	 * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名
	 * @param TTL: Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁
	 * @return LockResult
	public LockResult lock(String lockName, long TTL) {
		LockResult lockResult = new LockResult();
		/* 1.准备阶段 */
		// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;
		// 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁
		ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

		// 初始化返回值lockResult

		// 记录租约ID,初始值设为 0L
		Long leaseId = 0L;

		/* 2.创建租约 */
		try {
			// 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定
			leaseId = leaseClient.grant(TTL).get().getID();

			// 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定
			long period = TTL - TTL / 5;
			service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period, TimeUnit.SECONDS);
		} catch (InterruptedException | ExecutionException e) {
			System.err.println("[error]: Create lease failed:" + e);
			return lockResult;
		System.out.println(System.currentTimeMillis() + "|[  lock]: "+Thread.currentThread().getName()+" start to lock.");

		/* 3.加锁操作 */
		// 执行加锁操作,并为锁对应的key绑定租约
		try {
			lockClient.lock(ByteSequence.fromString(lockName), leaseId).get();
		} catch (InterruptedException | ExecutionException e1) {
			System.err.println("[error]: lock failed:" + e1);
			return lockResult;
		System.out.println(System.currentTimeMillis() + "|[  lock]: "+Thread.currentThread().getName()+" lock successfully.");

		return lockResult;

	 * 解锁操作,释放锁、关闭定时任务、解除租约
	 * @param lockName:锁名
	 * @param lockResult:加锁操作返回的结果
	public void unLock(String lockName, LockResult lockResult) {
		System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" start to unlock.");
		try {
			// 释放锁
			// 关闭定时任务
			// 删除租约
			if (lockResult.getLeaseId() != 0L) {
		} catch (InterruptedException | ExecutionException e) {
			System.err.println("[error]: unlock failed: " + e);

		System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" unlock successfully.");

	 * 在等待其它客户端释放锁期间,通过心跳续约,保证自己的锁对应租约不会失效
	static class KeepAliveTask implements Runnable {
		private Lease leaseClient;
		private long leaseId;

		KeepAliveTask(Lease leaseClient, long leaseId) {
			this.leaseClient = leaseClient;
			this.leaseId = leaseId;

		public void run() {
			// 续约一次

	 * 该class用于描述加锁的结果,同时携带解锁操作所需参数
	static class LockResult {
		private boolean isLockSuccess;
		private long leaseId;
		private ScheduledExecutorService service;

		LockResult() {

		public void setIsLockSuccess(boolean isLockSuccess) {
			this.isLockSuccess = isLockSuccess;

		public void setLeaseId(long leaseId) {
			this.leaseId = leaseId;

		public void setService(ScheduledExecutorService service) {
			this.service = service;

		public boolean getIsLockSuccess() {
			return this.isLockSuccess;

		public long getLeaseId() {
			return this.leaseId;

		public ScheduledExecutorService getService() {
			return this.service;

	 * 测试分布式锁
	 * @param args
	public static void main(String[] args) {
		// 模拟分布式场景下,多个进程 “抢锁”
		for (int i = 0; i < 10; i++) {
			new MyThread().start();

	static class MyThread extends Thread {
		public void run() {
			String lockName = "/lock/mylock"; // 分布式锁名称
			// 1. 加锁
			LockResult lockResult = getInstance().lock(lockName, 30);
			if (lockResult.getIsLockSuccess()) { // 获得了锁
				try {
					Thread.sleep(10000); // sleep 10秒,模拟执行相关业务
				} catch (InterruptedException e) {
					System.out.println("[error]:" + e);

			// 2. 解锁
			getInstance().unLock(lockName, lockResult);