欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

etcd分布式锁实现 博客分类: 分布式

程序员文章站 2024-03-22 20:39:22
...

引入maven依赖:

  <dependency>
	    <groupId>com.coreos</groupId>
	    <artifactId>jetcd-core</artifactId>
	    <version>0.0.2</version>
	</dependency>

 

分布式锁实现:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.Lease;
import com.coreos.jetcd.Lock;
import com.coreos.jetcd.data.ByteSequence;

/**
 * Etcd Java客户端 Jetcd提供的Lock客户端实现分布式锁
 */
public class EtcdDistributedLock {
	
	private static EtcdDistributedLock lock = null;
	private static Object mutex = new Object();
	private Client client; // etcd客户端
	private Lock lockClient; // etcd分布式锁客户端
	private Lease leaseClient; // etcd租约客户端

	private EtcdDistributedLock() {
		super();
		// 创建Etcd客户端,本例中Etcd集群只有一个节点
		this.client = Client.builder().endpoints("http://localhost:2379").build();
		this.lockClient = client.getLockClient();
		this.leaseClient = client.getLeaseClient();
	}

	/**
	 * 单例
	 */
	public static EtcdDistributedLock getInstance() {
		synchronized (mutex) { // 互斥锁
			if (null == lock) {
				lock = new EtcdDistributedLock();
			}
		}
		return lock;
	}

	/**
	 * 加锁操作,需要注意的是,本例中没有加入重试机制,加锁失败将直接返回。
	 * @param lockName: 针对某一共享资源(数据、文件等)制定的锁名
	 * @param TTL: Time To Live,租约有效期,一旦客户端崩溃,可在租约到期后自动释放锁
	 * @return LockResult
	 */
	public LockResult lock(String lockName, long TTL) {
		LockResult lockResult = new LockResult();
		
		/* 1.准备阶段 */
		// 创建一个定时任务作为“心跳”,保证等待锁释放期间,租约不失效;
		// 同时,一旦客户端发生故障,心跳便会停止,锁也会因租约过期而被动释放,避免死锁
		ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

		// 初始化返回值lockResult
		lockResult.setIsLockSuccess(false);
		lockResult.setService(service);

		// 记录租约ID,初始值设为 0L
		Long leaseId = 0L;

		/* 2.创建租约 */
		try {
			// 创建一个租约,租约有效期为TTL,实际应用中根据具体业务确定
			leaseId = leaseClient.grant(TTL).get().getID();
			lockResult.setLeaseId(leaseId);

			// 启动定时任务续约,心跳周期和初次启动延时计算公式如下,可根据实际业务制定
			long period = TTL - TTL / 5;
			service.scheduleAtFixedRate(new KeepAliveTask(leaseClient, leaseId), period, period, TimeUnit.SECONDS);
		} catch (InterruptedException | ExecutionException e) {
			System.err.println("[error]: Create lease failed:" + e);
			return lockResult;
		}
		System.out.println(System.currentTimeMillis() + "|[  lock]: "+Thread.currentThread().getName()+" start to lock.");

		/* 3.加锁操作 */
		// 执行加锁操作,并为锁对应的key绑定租约
		try {
			lockClient.lock(ByteSequence.fromString(lockName), leaseId).get();
		} catch (InterruptedException | ExecutionException e1) {
			System.err.println("[error]: lock failed:" + e1);
			return lockResult;
		}
		System.out.println(System.currentTimeMillis() + "|[  lock]: "+Thread.currentThread().getName()+" lock successfully.");

		lockResult.setIsLockSuccess(true);
		return lockResult;
	}

	/**
	 * 解锁操作,释放锁、关闭定时任务、解除租约
	 * 
	 * @param lockName:锁名
	 * @param lockResult:加锁操作返回的结果
	 */
	public void unLock(String lockName, LockResult lockResult) {
		System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" start to unlock.");
		try {
			// 释放锁
			lockClient.unlock(ByteSequence.fromString(lockName)).get();
			// 关闭定时任务
			lockResult.getService().shutdown();
			// 删除租约
			if (lockResult.getLeaseId() != 0L) {
				leaseClient.revoke(lockResult.getLeaseId());
			}
		} catch (InterruptedException | ExecutionException e) {
			System.err.println("[error]: unlock failed: " + e);
		}

		System.err.println(System.currentTimeMillis() + "|[unlock]: "+Thread.currentThread().getName()+" unlock successfully.");
	}

	/**
	 * 在等待其它客户端释放锁期间,通过心跳续约,保证自己的锁对应租约不会失效
	 */
	static class KeepAliveTask implements Runnable {
		private Lease leaseClient;
		private long leaseId;

		KeepAliveTask(Lease leaseClient, long leaseId) {
			this.leaseClient = leaseClient;
			this.leaseId = leaseId;
		}

		@Override
		public void run() {
			// 续约一次
			leaseClient.keepAliveOnce(leaseId);
		}
	}

	/**
	 * 该class用于描述加锁的结果,同时携带解锁操作所需参数
	 */
	static class LockResult {
		private boolean isLockSuccess;
		private long leaseId;
		private ScheduledExecutorService service;

		LockResult() {
			super();
		}

		public void setIsLockSuccess(boolean isLockSuccess) {
			this.isLockSuccess = isLockSuccess;
		}

		public void setLeaseId(long leaseId) {
			this.leaseId = leaseId;
		}

		public void setService(ScheduledExecutorService service) {
			this.service = service;
		}

		public boolean getIsLockSuccess() {
			return this.isLockSuccess;
		}

		public long getLeaseId() {
			return this.leaseId;
		}

		public ScheduledExecutorService getService() {
			return this.service;
		}
	}

	/**
	 * 测试分布式锁
	 * @param args
	 */
	public static void main(String[] args) {
		// 模拟分布式场景下,多个进程 “抢锁”
		for (int i = 0; i < 10; i++) {
			new MyThread().start();
		}
	}

	static class MyThread extends Thread {
		@Override
		public void run() {
			String lockName = "/lock/mylock"; // 分布式锁名称
			
			// 1. 加锁
			LockResult lockResult = getInstance().lock(lockName, 30);
			if (lockResult.getIsLockSuccess()) { // 获得了锁
				try {
					Thread.sleep(10000); // sleep 10秒,模拟执行相关业务
				} catch (InterruptedException e) {
					System.out.println("[error]:" + e);
				}
			}

			// 2. 解锁
			getInstance().unLock(lockName, lockResult);
		}
	}
}