欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

JAVA中的NIO (New IO)

程序员文章站 2023-11-15 17:01:52
简介 标准的IO是基于字节流和字符流进行操作的,而JAVA中的NIO是基于Channel和Buffer进行操作的。 传统IO NIO 核心模块 NIO主要有三个核心部分:Selector、Channel、Buffer 数据总是从Channel读取到Buffer或者从Buffer写入到Channel中 ......

简介

标准的io是基于字节流和字符流进行操作的,而java中的nio是基于channel和buffer进行操作的。

传统io

graph tb; 字节流 --> inputstream; 字节流 --> outputstream; 字符流 --> reader; 字符流 --> writer;

nio

graph tb; a[channel] --> b[buffer..]; c[channel] --> d[buffer..]; e[channel] --> f[buffer..];

核心模块

nio主要有三个核心部分:selector、channel、buffer

数据总是从channel读取到buffer或者从buffer写入到channel中。

selector可以监听多个channel的多个事件。

graph tb; selector --> a[channel]; selector --> b[channel]; selector --> c[channel]; a --> e1[event...]; b --> e2[event...]; c --> e3[event...];

传统的io与channel的区别

1.传统的io是bio的,而channel是nio的。

*当流调用了read()、write()方法后会一直阻塞线程直到数据被读取或写入完毕。

2.传统io流是单向的,而channel是双向的。


channel

filechannel:从文件中进行读取

datagramchannel:可以通过udp协议在网络中进行数据的传输

socketchannel:可以通过tcp协议在网络中进行数据的传输

serversocketchannel:可以作为一个服务器监听连接

channel通用api:

read(buffer):将数据从channel读取到buffer中,读取完毕返回-1。

read(buffer []):将数据从channel读取到多个buffer中,仅当第一个buffer被写满后往第二个buffer中进行写入。

write(buffer):将buffer中的数据写入到channel中。

write(buffer[]):将多个buffer中的数据写入到channel中,仅当第一个buffer中的数据被读取完毕后再从第二个buffer中进行读取。

register(selector,interest):将channel注册到selector中,同时需要向selector传递要监听此channel的事件类型(注册到selector中的channel一定要非阻塞的)

configureblocking(boolean):设置channel是否为阻塞。

transferfrom(position,count,channel):将其他channel中的数据传输到当前channel中。

transferto(position,count,channel):将当前channel中的数据传输到其他channel中。

socketchannel api

open()静态方法:创建socketchannel。

connect(new inetsocketaddress(port))方法:连接服务器。

finishconnect()方法:判断是否已经与服务器建立连接。

serversocketchannel api

open()静态方法:创建serversocketchannel。

accept()方法:该方法会一直阻塞线程直到有新连接到达。

阻塞式与非阻塞式channel

正常情况下channel都是阻塞的,只有当调用了configureblocking(false)方法时channel才为非阻塞。

阻塞式channel的connect()、accept()、read()、write()方法都会阻塞线程,直到处理完毕。

非阻塞式channel的connect()、accept()、read()、write()方法都是异步的。

*当调用了非阻塞式channel的connect()方法后,需要使用finishconnect()方法判断是否已经与服务器建立连接。

*当调用了非阻塞式channel的accept()方法后,需要根据方法的返回值是否为null判断是否接收到新的连接。

*当调用了非阻塞式channel的read()方法后,需要根据方法的返回值是否大于0判断是否有读取到数据。

*在使用非阻塞式channel的write()方法时,需要借助while循环与hasremaining()方法保证buffer中的内容被全部写入。

*filechannel一定是阻塞的。

示例

public void testfilechannel() throws ioexception {
    randomaccessfile randomaccessfile = new randomaccessfile(new file("f:\\笔记\\nginx.txt"), "rw");
    filechannel filechannel = randomaccessfile.getchannel();
    bytebuffer bytebuffer = bytebuffer.allocate(64);
    int count = filechannel.read(bytebuffer);
    while (count != -1) {
        bytebuffer.flip();
        system.out.println(new string(arrays.copyofrange(bytebuffer.array(),0,bytebuffer.limit()),charset.forname("utf-8")));
        bytebuffer.clear();
        count = filechannel.read(bytebuffer);
    }
}

buffer

buffer是一块可以进行读写操作的内存(顺序存储结构)

bytebuffer:基于byte类型进行存储

charbuffer:基于char类型进行存储

doublebuffer:基于double类型进行存储

floatbuffer:基于float类型进行存储

intbuffer:基于int类型进行存储

longbuffer:基于long类型进行存储

shortbuffer:基于short类型进行存储

buffer的内部结构

1.capacity:表示buffer的容量

2.position:表示当前的位置(从0开始,最大值为capacity-1)

3.limit:在写模式中表示可以写入的个数(与capacity一样),在读模式中表示可以读取的个数。

JAVA中的NIO (New IO)

从写模式转换成读模式

limit设置为position+1,position设置为0。

从读模式转换成写模式

limit设置为capacity,position设置为0。

往buffer中写数据

1.将数据从channel读取到buffer中。

2.使用buffer的put()方法。

从buffer中读数据

1.将buffer中的数据写入到channel中。

2.使用buffer的get()方法

buffer通用api:

allocate(size)静态静态:初始化一个buffer。

flip():将buffer从写模式转换成读模式。

array():将buffer中的内容转换成数组(不受limit控制)

get():获取buffer中的内容。

hasremaining():判断buffer中是否还有未读的元素(limit - (postion+1) )

rewind():将positon设置为0。

clear():将limit设置为capacity,position设置为0。

compact():将所有未读的元素移动到buffer的起始处,position指向最后一个未读的元素的下一位,limit设置为capacity。

*clear()和compact()方法都可以理解成将buffer从读模式转换成写模式,区别在于compact()方法会保留未读取的元素。

mark():在当前position处打一个标记。

reset():将position恢复到标记处。

selector

selector用于监听多个channel的多个事件(单线程)

graph tb; selector --> a[channel]; selector --> b[channel]; selector --> c[channel]; a --> e1[connect]; b --> e2[accept]; c --> e3[connect]; c --> e4[read];

channel的事件类型

1.连接就绪:当socketchannel、datagramchannel成功与服务器建立连接时将会触发连接就绪事件。

2.接收就绪:当有连接到达服务器时将会触发接收就绪事件。

3.读就绪:当socketchannel、datagramchannel有数据可读时将会触发读就绪事件。

4.写就绪:当socketchannel、datagramchannel可以进行数据写入时将会触发写就绪事件。

selectionkey

selectionkey用于存储selector与channel之间的相关信息。

selectionkey中提供了四个常量分别代表channel的事件类型。

selectionkey.op_connect

selectionkey.op_accept

selectionkey.op_read

selectionkey.op_write

selectablechannel提供的register(selector,interest)方法用于将channel注册到selector中,同时需要向selector传递要监听此channel的事件类型,当要监听的事件类型不止一个时可以使用或运算,当将channel注册到selector后会返回selectionkey实例,用于存储selector与此channel之间的相关信息。

selectionkey api:

interestops()方法:返回selector监听此channel的事件类型。

readyops()方法:返回此channel目前就绪的事件。

isacceptable():判断channel是否接收就绪。

isconnectable():判断channel是否连接就绪。

isreadable():判断channel是否读就绪。

iswriteable():判断channel是否写就绪。

channel():返回具体的channel实例。

selector():返回selector实例。

attach():往selectionkey中添加一个附加对象。

attachment():返回保存在selectionkey中的附加对象。

selector api:

open()静态方法:创建一个selector。

select()方法:该方法会一直阻塞线程直到所监听的channel有事件就绪,返回就绪的channel个数(只会返回新就绪的channel个数)

selectedkeys()方法:返回就绪的channel对应的selectionkey。

*当channel就绪的事件处理完毕后,需要手动删除selectionkey集合中该channel对应的selectionkey,当该channel再次有事件就绪时会自动加入到selectionkey集合中。

非阻塞式channel与selector

非阻塞式channel一般与selector配合使用

当selector监听到serversocketchannel接收就绪时,那么此时可以立即调用serversocketchannel的accept()方法获取新连接。

当selector监听到socketchannel读就绪时,那么此时可以立即调用socketchannel的read()方法进行数据的读取。

非阻塞式服务器

/**
 * @author: zhuang haotang
 * @date: 2019/10/26 16:35
 * @description:
 */
public class server {

    public void start() throws ioexception {
        selector selector = selector.open();
        serversocketchannel serversocketchannel = createnioserversocketchannel();
        system.out.println("start nio server and bind port 8888");
        serversocketchannel.register(selector, selectionkey.op_accept);
        int ready = selector.select();
        while (ready > 0) {
            system.out.println("ready channel count " + ready);
            set<selectionkey> selectionkeyset = selector.selectedkeys();
            for (iterator<selectionkey> iterator = selectionkeyset.iterator(); iterator.hasnext(); ) {
                selectionkey selectionkey = iterator.next();
                if (selectionkey.isacceptable()) {
                    system.out.println("acceptable");
                    accepthandler(selectionkey);
                } else if (selectionkey.isreadable()) {
                    system.out.println("readable");
                    readhandler(selectionkey);
                }
                iterator.remove();
            }
            ready = selector.select();
        }
    }

    private serversocketchannel createnioserversocketchannel() throws ioexception {
        serversocketchannel serversocketchannel = serversocketchannel.open();
        serversocketchannel.bind(new inetsocketaddress(inetaddress.getlocalhost(), 8888));
        serversocketchannel.configureblocking(false);
        return serversocketchannel;
    }

    private void accepthandler(selectionkey selectionkey) throws ioexception {
        selector selector = selectionkey.selector();
        serversocketchannel serversocketchannel = (serversocketchannel) selectionkey.channel();
        socketchannel socketchannel = serversocketchannel.accept();
        socketchannel.configureblocking(false);
        socketchannel.register(selector, selectionkey.op_read);
        system.out.println("accept client connection " + socketchannel.getlocaladdress());
    }

    private void readhandler(selectionkey selectionkey) throws ioexception {
        socketchannel socketchannel = (socketchannel) selectionkey.channel();
        bytebuffer bytebuffer = bytebuffer.allocate(100);
        int num = socketchannel.read(bytebuffer);
        if(num == -1){ // 连接已断开
            system.out.println("client "+socketchannel.getlocaladdress() + " disconnection");
            socketchannel.close();
            return;
        }
        bytebuffer.flip();
        while (bytebuffer.hasremaining()) {
            byte b = bytebuffer.get();
            system.out.println((char) b);
        }
    }

    public static void main(string[] args) throws ioexception {
        server server = new server();
        server.start();
    }

}

*一个channel不会同时有多个事件就绪,以事件为单位。

*当客户端断开连接,那么将会触发读就绪,并且channel的read()方法返回-1,表示连接已断开,服务器应该要做出处理,关闭这个连接。

客户端

/**
 * @auther: zhuang haotang
 * @date: 2019/10/26 16:36
 * @description:
 */
public class client {

    public static void main(string[] args) throws ioexception, interruptedexception {
        socketchannel socketchannel = socketchannel.open();
        socketchannel.connect(new inetsocketaddress(inetaddress.getlocalhost(),8888));

        string message = "today is sunday";
        bytebuffer bytebuffer = bytebuffer.allocate(message.getbytes().length);
        bytebuffer.put(message.getbytes());
        bytebuffer.flip();
        socketchannel.write(bytebuffer);
        thread.sleep(5000);
    }

}

运行结果

JAVA中的NIO (New IO)


reactor模式

reactor有三种模式

1.reactor单线程模式
2.reactor多线程模式
3.主从reactor多线程模式

*reactor模式是在nio下实现的。

reactor单线程模式

JAVA中的NIO (New IO)

1.单线程的事件分化器,同时这个线程需要处理接收、读、写就绪事件。

/**
 * @author: zhuang haotang
 * @date: 2019/10/26 16:35
 * @description:
 */
public class reactorsinglethreadserver {

    private void start() throws ioexception {
        selector selector = selector.open();
        serversocketchannel serversocketchannel = createnioserversocketchannel();
        system.out.println("start nio server and bind port 8888");
        serversocketchannel.register(selector, selectionkey.op_accept);
        int ready = selector.select();
        while (ready > 0) {
            system.out.println("ready channel count " + ready);
            set<selectionkey> selectionkeyset = selector.selectedkeys();
            for (iterator<selectionkey> iterator = selectionkeyset.iterator(); iterator.hasnext(); ) {
                selectionkey selectionkey = iterator.next();
                if (selectionkey.isacceptable()) {
                    system.out.println("acceptable");
                    accepthandler(selectionkey);
                } else if (selectionkey.isreadable()) {
                    system.out.println("readable");
                    readhandler(selectionkey);
                }
                iterator.remove();
            }
            ready = selector.select();
        }
    }

    private serversocketchannel createnioserversocketchannel() throws ioexception {
        serversocketchannel serversocketchannel = serversocketchannel.open();
        serversocketchannel.bind(new inetsocketaddress(inetaddress.getlocalhost(), 8888));
        serversocketchannel.configureblocking(false);
        return serversocketchannel;
    }

    private void accepthandler(selectionkey selectionkey) throws ioexception {
        selector selector = selectionkey.selector();
        serversocketchannel serversocketchannel = (serversocketchannel) selectionkey.channel();
        socketchannel socketchannel = serversocketchannel.accept();
        socketchannel.configureblocking(false);
        socketchannel.register(selector, selectionkey.op_read);
        system.out.println("accept client connection " + socketchannel.getlocaladdress());
    }

    private void readhandler(selectionkey selectionkey) throws ioexception {
        socketchannel socketchannel = (socketchannel) selectionkey.channel();
        bytebuffer bytebuffer = bytebuffer.allocate(100);
        int num = socketchannel.read(bytebuffer);
        if (num == -1) {
            system.out.println("client " + socketchannel.getlocaladdress() + " disconnection");
            socketchannel.close();
            return;
        }
        bytebuffer.flip();
        while (bytebuffer.hasremaining()) {
            byte b = bytebuffer.get();
            system.out.println((char) b);
        }
    }

    public static void main(string[] args) throws ioexception {
        reactorsinglethreadserver server = new reactorsinglethreadserver();
        server.start();
    }

}

reactor多线程模式

JAVA中的NIO (New IO)

1.单线程的事件分发器。

2.具体事件类型的handler线程池。

3.业务线程池。

/**
 * @author: zhuang haotang
 * @date: 2019-10-28 17:00
 * @description:
 */
public class reactormultithreadserver {

    private threadpoolexecutor eventhandlerpool = new threadpoolexecutor(10, 50, 2, timeunit.minutes, new arrayblockingqueue<runnable>(200), new threadpoolexecutor.callerrunspolicy());

    private void start() throws ioexception {
        selector selector = selector.open();
        serversocketchannel serversocketchannel = createnioserversocketchannel();
        system.out.println("start nio server and bind port 8888");
        serversocketchannel.register(selector, selectionkey.op_accept);
        selector.select();
        for (;;) {
            set<selectionkey> selectionkeyset = selector.selectedkeys();
            for (iterator<selectionkey> iterator = selectionkeyset.iterator(); iterator.hasnext(); ) {
                final selectionkey selectionkey = iterator.next();
                if (selectionkey.isacceptable()) {
                    system.out.println("acceptable");
                    eventhandlerpool.submit(new runnable() {
                        @override
                        public void run() {
                            try {
                                accepthandler(selectionkey);
                            } catch (ioexception e) {
                                e.printstacktrace();
                            }
                        }
                    });
                } else if (selectionkey.isreadable()) {
                    system.out.println("readable");
                    eventhandlerpool.submit(new runnable() {
                        @override
                        public void run() {
                            readhandler(selectionkey);
                        }
                    });
                }
                iterator.remove();
            }
//            thread.sleep(10); // 没找到好方案,留一些时间给register
            selector.select();
        }
    }

    private serversocketchannel createnioserversocketchannel() throws ioexception {
        serversocketchannel serversocketchannel = serversocketchannel.open();
        serversocketchannel.bind(new inetsocketaddress(inetaddress.getlocalhost(), 8888));
        serversocketchannel.configureblocking(false);
        return serversocketchannel;
    }

    private void accepthandler(selectionkey selectionkey) throws ioexception {
        selector selector = selectionkey.selector();
        serversocketchannel serversocketchannel = (serversocketchannel) selectionkey.channel();
        socketchannel socketchannel = serversocketchannel.accept();
        if (socketchannel != null) {
            socketchannel.configureblocking(false);
            selector.wakeup(); // 往selector注册channel时,selector要处于非阻塞状态
            socketchannel.register(selector, selectionkey.op_read);
            system.out.println("accept client connection " + socketchannel.getlocaladdress());
        }
    }

    private void readhandler(selectionkey selectionkey) {
        socketchannel socketchannel = (socketchannel) selectionkey.channel();
        bytebuffer bytebuffer = bytebuffer.allocate(100);
        try {
            int num = socketchannel.read(bytebuffer);
            if (num == -1) {
                system.out.println("client " + socketchannel.getlocaladdress() + " disconnection");
                socketchannel.close(); // 底层有些逻辑
                return;
            }
            bytebuffer.flip();
            while (bytebuffer.hasremaining()) {
                byte b = bytebuffer.get();
                system.out.println((char) b);
            }
        } catch (exception e) {
            system.out.println("由于连接关闭导致并发线程读取异常");
        }
    }

    public static void main(string[] args) throws ioexception {
        reactormultithreadserver reactorserver = new reactormultithreadserver();
        reactorserver.start();
    }

}

主从reactor多线程模式

JAVA中的NIO (New IO)

1.使用两个单线程的事件分发器。

第一个事件分发器只负责监听serversocketchannel的接收就绪事件,同时serversocketchannel接收到的连接要注册到第二个事件分发器中。
第二个事件分发器只负责监听socketchannel的读、写就绪事件。

2.具体事件类型的handler线程池。

3.业务线程池。

/**
 * @author: zhuang haotang
 * @date: 2019-10-28 17:00
 * @description:
 */
public class mainsubreactormultithreadserver {

    private threadpoolexecutor eventhandlerpool = new threadpoolexecutor(10, 50, 2, timeunit.minutes, new arrayblockingqueue<runnable>(200), new threadpoolexecutor.callerrunspolicy());

    private void start() throws ioexception {
        final selector mainselector = selector.open();
        final selector subselector = selector.open();

        new thread(new runnable() {
            @override
            public void run() {
                try {
                    startmainselector(mainselector, subselector);
                } catch (ioexception e) {
                    e.printstacktrace();
                }
            }
        }).start();

        new thread(new runnable() {
            @override
            public void run() {
                try {
                    startsubselector(subselector);
                } catch (ioexception e) {
                    e.printstacktrace();
                }
            }
        }).start();

    }

    /**
     * 第一个事件分发器,用于监听serversocketchannel的接收就绪事件
     */
    private void startmainselector(selector mainselector, final selector subselector) throws ioexception {
        serversocketchannel serversocketchannel = createnioserversocketchannel();
        system.out.println("start nio server and bind port 8888");
        serversocketchannel.register(mainselector, selectionkey.op_accept);
        mainselector.select();
        for (; ; ) {
            final set<selectionkey> selectionkeyset = mainselector.selectedkeys();
            final selectionkey selectionkey = iterables.getonlyelement(selectionkeyset);
            if (selectionkey.isacceptable()) {
                system.out.println("acceptable");
                eventhandlerpool.submit(new runnable() {
                    @override
                    public void run() {
                        try {
                            accepthandler(selectionkey, subselector);
                        } catch (ioexception e) {
                            e.printstacktrace();
                        }
                    }
                });
                selectionkeyset.clear();
            }
            mainselector.select();
        }
    }

    /**
     * 第二个事件分发器,用于监听sockchannel的读写就绪事件
     */
    private void startsubselector(selector subselector) throws ioexception {
        subselector.select();
        for (; ; ) {
            set<selectionkey> selectionkeyset = subselector.selectedkeys();
            for (iterator<selectionkey> iterator = selectionkeyset.iterator(); iterator.hasnext(); ) {
                final selectionkey selectionkey = iterator.next();
                if (selectionkey.isreadable()) {
                    system.out.println("readable");
                    eventhandlerpool.submit(new runnable() {
                        @override
                        public void run() {
                            readhandler(selectionkey);
                        }
                    });
                    iterator.remove();
                }
            }
//            thread.sleep(10); // 没找到好方案,留一些时间给register
            subselector.select();
        }
    }

    private serversocketchannel createnioserversocketchannel() throws ioexception {
        serversocketchannel serversocketchannel = serversocketchannel.open();
        serversocketchannel.bind(new inetsocketaddress(inetaddress.getlocalhost(), 8888));
        serversocketchannel.configureblocking(false);
        return serversocketchannel;
    }

    private void accepthandler(selectionkey selectionkey, selector subselector) throws ioexception {
        serversocketchannel serversocketchannel = (serversocketchannel) selectionkey.channel();
        socketchannel socketchannel = serversocketchannel.accept();
        if (socketchannel != null) {
            socketchannel.configureblocking(false);
            subselector.wakeup(); // 往selector注册channel时,selector要处于非阻塞状态
            socketchannel.register(subselector, selectionkey.op_read);
            system.out.println("accept client connection " + socketchannel.getlocaladdress() + " and register to subselector");
        }
    }

    private void readhandler(selectionkey selectionkey) {
        socketchannel socketchannel = (socketchannel) selectionkey.channel();
        bytebuffer bytebuffer = bytebuffer.allocate(100);
        try {
            int num = socketchannel.read(bytebuffer);
            if (num == -1) {
                system.out.println("client " + socketchannel.getlocaladdress() + " disconnection");
                socketchannel.close(); // 底层有些逻辑
                return;
            }
            bytebuffer.flip();
            while (bytebuffer.hasremaining()) {
                byte b = bytebuffer.get();
                system.out.println((char) b);
            }
        } catch (exception e) {
            system.out.println("由于连接关闭导致并发线程读取异常");
        }
    }

    public static void main(string[] args) throws ioexception {
        mainsubreactormultithreadserver reactorserver = new mainsubreactormultithreadserver();
        reactorserver.start();
    }

}

通用客户端

/**
 * @author: zhuang haotang
 * @date: 2019/10/26 16:36
 * @description:
 */
public class client {

    public static void main(string[] args) throws ioexception, interruptedexception {
        socketchannel socketchannel = socketchannel.open();
        socketchannel.connect(new inetsocketaddress(inetaddress.getlocalhost(), 8888));
        string message = "today is sunday";
        bytebuffer bytebuffer = bytebuffer.allocate(message.getbytes().length);
        bytebuffer.put(message.getbytes());
        bytebuffer.flip();
        socketchannel.write(bytebuffer);
        thread.sleep(5000);
        bytebuffer bytebuffer1 = bytebuffer.allocate("wo".getbytes().length).put("wo".getbytes());
        bytebuffer1.flip();
        socketchannel.write(bytebuffer1);

        bytebuffer receivebuffer = bytebuffer.allocate(1024);
        while (true) {
            socketchannel.read(receivebuffer);
            receivebuffer.flip();
            while (receivebuffer.hasremaining()) {
                system.out.println((char)receivebuffer.get());
            }
            receivebuffer.clear();
        }
    }

}

*主线程不需要等待具体事件类型的handler处理完毕,直接异步返回,那么将会导致事件重复就绪,程序做出相应的控制即可。

*当有连接到达服务器时,将会触发接收就绪事件,那么主线程将会不停的向线程池中提交任务,直到某个线程接收了连接,此时将会停止接收就绪,其他线程接收到的连接为null。

*当channel有数据可读时,将会触发读就绪,那么主线程将会不停的向线程池提交任务,直到某个线程读取完毕,此时将会停止读就绪,其他线程读取到的个数为0。

*当客户端断开连接时,将会触发读就绪,那么主线程将会不停的向线程池提交任务,直到某个线程关闭连接,此时将会停止读就绪

一般不会直接去使用java nio,只是通过java nio学习他的设计思想,如果要想搭建nio服务器那么应该使用netty等nio框架。


关于bio和nio的选择

bio即同步并阻塞,线程会进入阻塞状态,如果并发连接数只有几百,那么创建几百个线程去处理是没有任何问题的,这种方式更加简单高效。

但是如果并发连接数达到几万,那么显然创建几万个线程去处理是不可行的,系统承受不了这个负荷,此时应该使用nio,即同步非阻塞,利用更少的线程去做更多的事情。

java nio就是使用nio(同步非阻塞),使用io多路复用的select模型。

JAVA中的NIO (New IO)

*不管客户端有多少个并发连接和请求,服务端总是可以利用更少的线程去处理(单线程事件分发器 和 具体事件类型的handler线程池)