欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java线程阻塞中断与LockSupport使用介绍

程序员文章站 2023-10-24 08:15:05
上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下thread.interrupt和java 5之后的locksupport的实现。 在介绍之...
上周五和周末,工作忙里偷闲,在看java cocurrent中也顺便再温故了一下thread.interrupt和java 5之后的locksupport的实现。
在介绍之前,先抛几个问题。
thread.interrupt()方法和interruptedexception异常的关系?是由interrupt触发产生了interruptedexception异常?
thread.interrupt()会中断线程什么状态的工作? running or blocking?
一般thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
locksupport.park()和unpark(),与object.wait()和notify()的区别?
locksupport.park(object blocker)传递的blocker对象做什么用?
locksupport能响应thread.interrupt()事件不?会抛出interruptedexception异常?
thread.interrupt()处理是否有对应的回调函数?类似于钩子调用?
如果你都都能很明确的答上来了,说明你已经完全懂thread.interrupt,可以不用往下看那了。
那如果不清楚的,带着这几个问题,一起来梳理下。
thread的interrupt处理的几个方法:
public void interrupt() : 执行线程interrupt事件
public boolean isinterrupted() : 检查当前线程是否处于interrupt
public static boolean interrupted() : check当前线程是否处于interrupt,并重置interrupt信息。类似于resetandget()
理解:
1. 每个线程都有一个interrupt status标志位,用于表明当前线程是否处于中断状态
2. 一般调用thread.interrupt()会有两种处理方式
遇到一个低优先级的block状态时,比如object.wait(),object.sleep(),object.join()。它会立马触发一个unblock解除阻塞,并throw一个interruptedexception。
其他情况,thread.interrupt()仅仅只是更新了status标志位。然后你的工作线程通过thread.isinterrrupted()进行检查,可以做相应的处理,比如也throw interruptedexception或者是清理状态,取消task等。
在interrupt javadoc中描述:
java线程阻塞中断与LockSupport使用介绍 
最佳实践
ibm上有篇文章写的挺不错。java theory and practice: dealing with interruptedexception , 里面提到了interrupt处理的几条最佳实践。
don't swallow interrupts (别吃掉interrupt,一般是两种处理: 继续throw interruptedexception异常。 另一种就是继续设置thread.interupt()异常标志位,让更上一层去进行相应处理。
复制代码 代码如下:

public class taskrunner implements runnable {
private blockingqueue<task> queue;
public taskrunner(blockingqueue<task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
task task = queue.take(10, timeunit.seconds);
task.execute();
}
}
catch (interruptedexception e) {
// restore the interrupted status
thread.currentthread().interrupt();
}
}
}

复制代码 代码如下:

public class taskrunner implements runnable {
private blockingqueue<task> queue;
public taskrunner(blockingqueue<task> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
task task = queue.take(10, timeunit.seconds);
task.execute();
}
}
catch (interruptedexception e) {
// restore the interrupted status
thread.currentthread().interrupt();
}
}
}

implementing cancelable tasks with interrupt (使用thread.interrupt()来设计和支持可被cancel的task)
复制代码 代码如下:

public class primeproducer extends thread {
private final blockingqueue<biginteger> queue;
primeproducer(blockingqueue<biginteger> queue) {
this.queue = queue;
}
public void run() {
try {
biginteger p = biginteger.one;
while (!thread.currentthread().isinterrupted())
queue.put(p = p.nextprobableprime());
} catch (interruptedexception consumed) {
/* allow thread to exit */
}
}
public void cancel() { interrupt(); } // 发起中断
}<span style="white-space: normal"> </span>

复制代码 代码如下:

public class primeproducer extends thread {
private final blockingqueue<biginteger> queue;
primeproducer(blockingqueue<biginteger> queue) {
this.queue = queue;
}
public void run() {
try {
biginteger p = biginteger.one;
while (!thread.currentthread().isinterrupted())
queue.put(p = p.nextprobableprime());
} catch (interruptedexception consumed) {
/* allow thread to exit */
}
}
public void cancel() { interrupt(); } // 发起中断
}<span style="white-space: normal"> </span>

注册interrupt处理事件(非正常用法)
一般正常的task设计用来处理cancel,都是采用主动轮询的方式检查thread.isinterrupt(),对业务本身存在一定的嵌入性,还有就是存在延迟,你得等到下一个检查点(谁知道下一个检查点是在什么时候,特别是进行一个socket.read时,遇到过一个httpclient超时的问题)。
来看一下,主动抛出interruptedexception异常的实现,借鉴于interruptiblechannel的设计,比较取巧。
复制代码 代码如下:

interface interruptable { // 定义可中断的接口
public void interrupt() throws interruptedexception;
}
abstract class interruptsupport implements interruptable {
private volatile boolean interrupted = false;
private interruptible interruptor = new interruptible() {
public void interrupt() {
interrupted = true;
interruptsupport.this.interrupt(); // 位置3
}
};
public final boolean execute() throws interruptedexception {
try {
blockedon(interruptor); // 位置1
if (thread.currentthread().isinterrupted()) { // 立马被interrupted
interruptor.interrupt();
}
// 执行业务代码
bussiness();
} finally {
blockedon(null); // 位置2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.sharedsecrets --
static void blockedon(interruptible intr) { // package-private
sun.misc.sharedsecrets.getjavalangaccess().blockedon(thread.currentthread(), intr);
}
}

复制代码 代码如下:

interface interruptable { // 定义可中断的接口
public void interrupt() throws interruptedexception;
}
abstract class interruptsupport implements interruptable {
private volatile boolean interrupted = false;
private interruptible interruptor = new interruptible() {
public void interrupt() {
interrupted = true;
interruptsupport.this.interrupt(); // 位置3
}
};
public final boolean execute() throws interruptedexception {
try {
blockedon(interruptor); // 位置1
if (thread.currentthread().isinterrupted()) { // 立马被interrupted
interruptor.interrupt();
}
// 执行业务代码
bussiness();
} finally {
blockedon(null); // 位置2
}
return interrupted;
}
public abstract void bussiness() ;
public abstract void interrupt();
// -- sun.misc.sharedsecrets --
static void blockedon(interruptible intr) { // package-private
sun.misc.sharedsecrets.getjavalangaccess().blockedon(thread.currentthread(), intr);
}
}

代码说明,几个取巧的点:
位置1:利用sun提供的blockedon方法,绑定对应的interruptible事件处理钩子到指定的thread上。
位置2:执行完代码后,清空钩子。避免使用连接池时,对下一个thread处理事件的影响。
位置3:定义了interruptible事件钩子的处理方法,回调interruptsupport.this.interrupt()方法,子类可以集成实现自己的业务逻辑,比如sock流关闭等等。
使用:
复制代码 代码如下:

class interruptread extends interruptsupport {
private fileinputstream in;
@override
public void bussiness() {
file file = new file("/dev/urandom"); // 读取linux黑洞,永远读不完
try {
in = new fileinputstream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// thread.sleep(100);
// if (thread.interrupted()) {// 以前的interrupt检查方式
// throw new interruptedexception("");
// }
}
} catch (exception e) {
throw new runtimeexception(e);
}
}
public fileinputstream getin() {
return in;
}
@override
public void interrupt() {
try {
in.getchannel().close();
} catch (ioexception e) {
e.printstacktrace();
}
}
}
public static void main(string args[]) throws exception {
final interruptread test = new interruptread();
thread t = new thread() {
@override
public void run() {
long start = system.currenttimemillis();
try {
system.out.println("interruptread start!");
test.execute();
} catch (interruptedexception e) {
system.out.println("interruptread end! cost time : " + (system.currenttimemillis() - start));
e.printstacktrace();
}
}
};
t.start();
// 先让read执行3秒
thread.sleep(3000);
// 发出interrupt中断
t.interrupt();
}

复制代码 代码如下:

class interruptread extends interruptsupport {
private fileinputstream in;
@override
public void bussiness() {
file file = new file("/dev/urandom"); // 读取linux黑洞,永远读不完
try {
in = new fileinputstream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
// thread.sleep(100);
// if (thread.interrupted()) {// 以前的interrupt检查方式
// throw new interruptedexception("");
// }
}
} catch (exception e) {
throw new runtimeexception(e);
}
}
public fileinputstream getin() {
return in;
}
@override
public void interrupt() {
try {
in.getchannel().close();
} catch (ioexception e) {
e.printstacktrace();
}
}
}
public static void main(string args[]) throws exception {
final interruptread test = new interruptread();
thread t = new thread() {
@override
public void run() {
long start = system.currenttimemillis();
try {
system.out.println("interruptread start!");
test.execute();
} catch (interruptedexception e) {
system.out.println("interruptread end! cost time : " + (system.currenttimemillis() - start));
e.printstacktrace();
}
}
};
t.start();
// 先让read执行3秒
thread.sleep(3000);
// 发出interrupt中断
t.interrupt();
}

jdk源码介绍:
1. sun提供的钩子可以查看system的相关代码, line : 1125
复制代码 代码如下:

sun.misc.sharedsecrets.setjavalangaccess(new sun.misc.javalangaccess(){
public sun.reflect.constantpool getconstantpool(class klass) {
return klass.getconstantpool();
}
public void setannotationtype(class klass, annotationtype type) {
klass.setannotationtype(type);
}
public annotationtype getannotationtype(class klass) {
return klass.getannotationtype();
}
public <e extends enum<e>>
e[] getenumconstantsshared(class<e> klass) {
return klass.getenumconstantsshared();
}
public void blockedon(thread t, interruptible b) {
t.blockedon(b);
}
});

复制代码 代码如下:

sun.misc.sharedsecrets.setjavalangaccess(new sun.misc.javalangaccess(){
public sun.reflect.constantpool getconstantpool(class klass) {
return klass.getconstantpool();
}
public void setannotationtype(class klass, annotationtype type) {
klass.setannotationtype(type);
}
public annotationtype getannotationtype(class klass) {
return klass.getannotationtype();
}
public <e extends enum<e>>
e[] getenumconstantsshared(class<e> klass) {
return klass.getenumconstantsshared();
}
public void blockedon(thread t, interruptible b) {
t.blockedon(b);
}
});

2. thread.interrupt()
复制代码 代码如下:

public void interrupt() {
if (this != thread.currentthread())
checkaccess();
synchronized (blockerlock) {
interruptible b = blocker;
if (b != null) {
interrupt0(); // just to set the interrupt flag
b.interrupt(); //回调钩子
return;
}
}
interrupt0();
}

复制代码 代码如下:

public void interrupt() {
if (this != thread.currentthread())
checkaccess();
synchronized (blockerlock) {
interruptible b = blocker;
if (b != null) {
interrupt0(); // just to set the interrupt flag
b.interrupt(); //回调钩子
return;
}
}
interrupt0();
}

更多
更多关于thread.stop,suspend,resume,interrupt的使用注意点,可以看一下sun的文档,比如http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadprimitivedeprecation.html
最后来解答一下之前的几个问题:
问题1: thread.interrupt()方法和interruptedexception异常的关系?是由interrupt触发产生了interruptedexception异常?
答: thread.interrupt()只是在object.wait() .object.join(), object.sleep()几个方法会主动抛出interruptedexception异常。而在其他的的block常见,只是通过设置了thread的一个标志位信息,需要程序自我进行处理。
复制代码 代码如下:

if (thread.interrupted()) // clears interrupted status!
throw new interruptedexception();

复制代码 代码如下:

if (thread.interrupted()) // clears interrupted status!
throw new interruptedexception();

问题2:thread.interrupt()会中断线程什么状态的工作? running or blocking?
答:thread.interrupt设计的目的主要是用于处理线程处于block状态,比如wait(),sleep()状态就是个例子。但可以在程序设计时为支持task cancel,同样可以支持running状态。比如object.join()和一些支持interrupt的一些nio channel设计。
问题3: 一般thread编程需要关注interrupt中断不?一般怎么处理?可以用来做什么?
答: interrupt用途: unblock操作,支持任务cancel, 数据清理等。
问题4: locksupport.park()和unpark(),与object.wait()和notify()的区别?
答:
1. 面向的主体不一样。locksuport主要是针对thread进进行阻塞处理,可以指定阻塞队列的目标对象,每次可以指定具体的线程唤醒。object.wait()是以对象为纬度,阻塞当前的线程和唤醒单个(随机)或者所有线程。
2. 实现机制不同。虽然locksuport可以指定monitor的object对象,但和object.wait(),两者的阻塞队列并不交叉。可以看下测试例子。object.notifyall()不能唤醒locksupport的阻塞thread.
问题5: locksupport.park(object blocker)传递的blocker对象做什么用?
答: 对应的blcoker会记录在thread的一个parkblocker属性中,通过jstack命令可以非常方便的监控具体的阻塞对象.
复制代码 代码如下:

public static void park(object blocker) {
thread t = thread.currentthread();
setblocker(t, blocker); // 设置thread.parkblocker属性的值
unsafe.park(false, 0l);
setblocker(t, null); // 清除thread.parkblocker属性的值
}

复制代码 代码如下:

public static void park(object blocker) {
thread t = thread.currentthread();
setblocker(t, blocker); // 设置thread.parkblocker属性的值
unsafe.park(false, 0l);
setblocker(t, null); // 清除thread.parkblocker属性的值
}

具体locksupport的javadoc描述也比较清楚,可以看下:
java线程阻塞中断与LockSupport使用介绍 
问题6: locksupport能响应thread.interrupt()事件不?会抛出interruptedexception异常?
答:能响应interrupt事件,但不会抛出interruptedexception异常。针对locksupport对thread.interrupte支持,也先看一下javadoc中的描述:
java线程阻塞中断与LockSupport使用介绍 
相关测试代码
复制代码 代码如下:

package com.agapple.cocurrent;
import java.io.file;
import java.io.fileinputstream;
import java.lang.reflect.field;
import java.util.concurrent.timeunit;
import java.util.concurrent.locks.locksupport;
public class locksupporttest {
private static locksupporttest blocker = new locksupporttest();
public static void main(string args[]) throws exception {
locksupporttest();
parktest();
interruptparktest();
interruptsleeptest();
interruptwaittest();
}
/**
* locksupport.park对象后,尝试获取thread.blocker对象,调用其single唤醒
*
* @throws exception
*/
private static void locksupporttest() throws exception {
thread t = dotest(new testcallback() {
@override
public void callback() throws exception {
// 尝试sleep 5s
system.out.println("blocker");
locksupport.park(blocker);
system.out.println("wakeup now!");
}
@override
public string getname() {
return "locksupporttest";
}
});
t.start(); // 启动读取线程
thread.sleep(150);
synchronized (blocker) {
field field = thread.class.getdeclaredfield("parkblocker");
field.setaccessible(true);
object fblocker = field.get(t);
system.out.println(blocker == fblocker);
thread.sleep(100);
system.out.println("notifyall");
blocker.notifyall();
}
}
/**
* 尝试去中断一个object.wait(),会抛出对应的interruptedexception异常
*
* @throws interruptedexception
*/
private static void interruptwaittest() throws interruptedexception {
final object obj = new object();
thread t = dotest(new testcallback() {
@override
public void callback() throws exception {
// 尝试sleep 5s
obj.wait();
system.out.println("wakeup now!");
}
@override
public string getname() {
return "interruptwaittest";
}
});
t.start(); // 启动读取线程
thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个thread.sleep(),会抛出对应的interruptedexception异常
*
* @throws interruptedexception
*/
private static void interruptsleeptest() throws interruptedexception {
thread t = dotest(new testcallback() {
@override
public void callback() throws exception {
// 尝试sleep 5s
thread.sleep(5000);
system.out.println("wakeup now!");
}
@override
public string getname() {
return "interruptsleeptest";
}
});
t.start(); // 启动读取线程
thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个locksupport.park(),会有响应但不会抛出interruptedexception异常
*
* @throws interruptedexception
*/
private static void interruptparktest() throws interruptedexception {
thread t = dotest(new testcallback() {
@override
public void callback() {
// 尝试去park 自己线程
locksupport.parknanos(blocker, timeunit.seconds.tonanos(5));
system.out.println("wakeup now!");
}
@override
public string getname() {
return "interruptparktest";
}
});
t.start(); // 启动读取线程
thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个locksupport.unpark(),会有响应
*
* @throws interruptedexception
*/
private static void parktest() throws interruptedexception {
thread t = dotest(new testcallback() {
@override
public void callback() {
// 尝试去park 自己线程
locksupport.park(blocker);
system.out.println("wakeup now!");
}
@override
public string getname() {
return "parktest";
}
});
t.start(); // 启动读取线程
thread.sleep(2000);
locksupport.unpark(t);
t.interrupt();
}
public static thread dotest(final testcallback call) {
return new thread() {
@override
public void run() {
file file = new file("/dev/urandom"); // 读取linux黑洞
try {
fileinputstream in = new fileinputstream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (thread.interrupted()) {
throw new interruptedexception("");
}
system.out.println(bytes[0]);
thread.sleep(100);
long start = system.currenttimemillis();
call.callback();
system.out.println(call.getname() + " callback finish cost : "
+ (system.currenttimemillis() - start));
}
} catch (exception e) {
e.printstacktrace();
}
}
};
}
}
interface testcallback {
public void callback() throws exception;
public string getname();
}

复制代码 代码如下:

package com.agapple.cocurrent;
import java.io.file;
import java.io.fileinputstream;
import java.lang.reflect.field;
import java.util.concurrent.timeunit;
import java.util.concurrent.locks.locksupport;
public class locksupporttest {
private static locksupporttest blocker = new locksupporttest();
public static void main(string args[]) throws exception {
locksupporttest();
parktest();
interruptparktest();
interruptsleeptest();
interruptwaittest();
}
/**
* locksupport.park对象后,尝试获取thread.blocker对象,调用其single唤醒
*
* @throws exception
*/
private static void locksupporttest() throws exception {
thread t = dotest(new testcallback() {
@override
public void callback() throws exception {
// 尝试sleep 5s
system.out.println("blocker");
locksupport.park(blocker);
system.out.println("wakeup now!");
}
@override
public string getname() {
return "locksupporttest";
}
});
t.start(); // 启动读取线程
thread.sleep(150);
synchronized (blocker) {
field field = thread.class.getdeclaredfield("parkblocker");
field.setaccessible(true);
object fblocker = field.get(t);
system.out.println(blocker == fblocker);
thread.sleep(100);
system.out.println("notifyall");
blocker.notifyall();
}
}
/**
* 尝试去中断一个object.wait(),会抛出对应的interruptedexception异常
*
* @throws interruptedexception
*/
private static void interruptwaittest() throws interruptedexception {
final object obj = new object();
thread t = dotest(new testcallback() {
@override
public void callback() throws exception {
// 尝试sleep 5s
obj.wait();
system.out.println("wakeup now!");
}
@override
public string getname() {
return "interruptwaittest";
}
});
t.start(); // 启动读取线程
thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个thread.sleep(),会抛出对应的interruptedexception异常
*
* @throws interruptedexception
*/
private static void interruptsleeptest() throws interruptedexception {
thread t = dotest(new testcallback() {
@override
public void callback() throws exception {
// 尝试sleep 5s
thread.sleep(5000);
system.out.println("wakeup now!");
}
@override
public string getname() {
return "interruptsleeptest";
}
});
t.start(); // 启动读取线程
thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个locksupport.park(),会有响应但不会抛出interruptedexception异常
*
* @throws interruptedexception
*/
private static void interruptparktest() throws interruptedexception {
thread t = dotest(new testcallback() {
@override
public void callback() {
// 尝试去park 自己线程
locksupport.parknanos(blocker, timeunit.seconds.tonanos(5));
system.out.println("wakeup now!");
}
@override
public string getname() {
return "interruptparktest";
}
});
t.start(); // 启动读取线程
thread.sleep(2000);
t.interrupt(); // 检查下在park时,是否响应中断
}
/**
* 尝试去中断一个locksupport.unpark(),会有响应
*
* @throws interruptedexception
*/
private static void parktest() throws interruptedexception {
thread t = dotest(new testcallback() {
@override
public void callback() {
// 尝试去park 自己线程
locksupport.park(blocker);
system.out.println("wakeup now!");
}
@override
public string getname() {
return "parktest";
}
});
t.start(); // 启动读取线程
thread.sleep(2000);
locksupport.unpark(t);
t.interrupt();
}
public static thread dotest(final testcallback call) {
return new thread() {
@override
public void run() {
file file = new file("/dev/urandom"); // 读取linux黑洞
try {
fileinputstream in = new fileinputstream(file);
byte[] bytes = new byte[1024];
while (in.read(bytes, 0, 1024) > 0) {
if (thread.interrupted()) {
throw new interruptedexception("");
}
system.out.println(bytes[0]);
thread.sleep(100);
long start = system.currenttimemillis();
call.callback();
system.out.println(call.getname() + " callback finish cost : "
+ (system.currenttimemillis() - start));
}
} catch (exception e) {
e.printstacktrace();
}
}
};
}
}
interface testcallback {
public void callback() throws exception;
public string getname();
}

最后
发觉文章越写越长,那就索性发到了论坛,大家一起讨论下.毕竟文章中描述的都是一些使用层面的东东,并没有从操作系统或者sun native实现上去介绍thread的一些机制,熟悉这块的大牛门也可以出来发表下高见.