欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java-非阻塞异步通信-NIO初探

程序员文章站 2022-07-12 19:20:25
...

java的NIO为非阻塞式的Socket通信提供了以下类:
Selector类
SelectableChannel类
SelectionKey
以下为一个实现非阻塞式通信的简单实例:
服务器端

package noBlock;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

public class NServer {
    //用于检测所有channel状态的selector
    private Selector selector = null;
    private Charset charset = Charset.forName("UTF-8");
    public void init() throws IOException{
        //通过 open()方法创建一个selector 实例,该方法将使用系统默认的selector 返回新的selector
        selector = Selector.open();
        //通过open 方法打开一个未绑定的ServerSocketChannel 实例
        //打开一个监听通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress isa = new InetSocketAddress("127.0.0.1",30000);
        //将该ServerScoketChannel 绑定到指定IP 地址
        //监听IP地址为127.0.0.1,端口号为30000发来的消息
        serverSocketChannel.socket().bind(isa);
        //设置ServerSocket 以非阻塞方式工作
        serverSocketChannel.configureBlocking(false);
        //将 serverSocketChannel 注册到指定 Selector 对象
        serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

        while(selector.select() > 0){
            //依次处理selector 上的每个已选择的selectionKey
            for(SelectionKey sk:selector.selectedKeys()){
                //从selector 上的已选择key集中删除正在处理的 SelectionKey
                selector.selectedKeys().remove(sk);
                if(sk.isAcceptable()){
                    //调用accept方法接受连接,产生服务器端对应的SocketChannel
                    SocketChannel sc= serverSocketChannel.accept();
                    //设置采用非阻塞模式
                    sc.configureBlocking(false);
                    //将该SocketChannel 也注册到selector
                    sc.register(selector,SelectionKey.OP_READ);
                    //将sk 对应的Channel设置成准备接受其他请求
                    sk.interestOps(SelectionKey.OP_ACCEPT);
                }
                if(sk.isReadable()){
                    //获取该SelectionKey 对应的Channel ,该channel中有可读的数据
                    SocketChannel sc = (SocketChannel) sk.channel();
                    //定义准备执行数据的ByteBuffer
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    String content="";
                    try {
                        while (sc.read(byteBuffer)>0){
                            byteBuffer.flip();
                            content += charset.decode(byteBuffer);
                        }
                        //打印从该sk 对应的Channel 里读取到的数据
                        System.out.println("==========="+content);
                        //将sk 对应的Channel 设置成下一次读取
                        sk.interestOps(SelectionKey.OP_READ);
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        sk.cancel();
                        if (sk.channel()!=null) {
                            sk.channel().close();

                        }
                    }
                    if (content.length()>0) {
                        for (SelectionKey key :selector.keys()) {
                            //获取该key 对应的channel
                            Channel targetChannel = key.channel();
                            if (targetChannel instanceof SocketChannel) {
                                //将独到的内容写入该Channel 中
                                SocketChannel dest = (SocketChannel) targetChannel;
                                dest.write(charset.encode(content));
                            }
                        }
                    }
                }
            }
        }
    }
    public static void main(String[] args) throws IOException {
        new NServer().init();
    }
}

客户端:

package noBlock;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;



public class NClient {
    private Selector selector =null;
    private Charset charset = Charset.forName("UTF-8");
    private SocketChannel sChannel = null;
    public void init() throws IOException{
        selector = Selector.open();
        InetSocketAddress isa = new InetSocketAddress("127.0.0.1",30000);
        sChannel = SocketChannel.open(isa);
        sChannel.configureBlocking(false);
        sChannel.register(selector, SelectionKey.OP_READ);
        new ClientThread().start();
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String line = scanner.nextLine();
            sChannel.write(charset.encode(line));
        }
    }
    private class ClientThread extends Thread
    {
        public void run(){
            try {
                while(selector.select()>0){
                    for(SelectionKey sKey :selector.selectedKeys()){
                        selector.selectedKeys().remove(sKey);
                        if(sKey.isReadable()){
                            SocketChannel sc =(SocketChannel) sKey.channel();
                            ByteBuffer buffer =ByteBuffer.allocate(1024);
                            String content ="";
                            while(sc.read(buffer)>0){
                                sc.read(buffer);
                                buffer.flip();
                                content+=charset.decode(buffer);
                            }
                            System.out.println("聊天信息" +content);
                            sKey.interestOps(SelectionKey.OP_READ);
                        }
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) throws IOException {
        new NClient().init();
    }
}



SelectionKey 的四种操作

OP_READ

OP_ACCEPT
服务端监听,并注册OP_ACCEPT事件后,就已准备好接受客户端的连接了
例如:serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
以及:sk.interestOps(SelectionKey.OP_ACCEPT);
interestOps( ops)以一个位图为参数,指示了应该监听信道上的哪些操作
OP_CONNECT
当客户端调用connect()并注册OP_CONNECT事件后,连接操作就会就绪。
OP_WRITE
写就绪相对有一点特殊,一般来说,你不应该注册写事件。写操作的就绪条件为底层缓冲区有空闲空间,而写缓冲区绝大部分时间都是有空闲空间的,所以当你注册写事件后,写操作一直是就绪的,选择处理线程全占用整个CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。

当有数据在写时,将数据写到缓冲区中,并注册写事件。

public void write(byte[] data) throws IOException {  
    writeBuffer.put(data);  
    key.interestOps(SelectionKey.OP_WRITE);  
}  

注册写事件后,写操作就绪,这时将之前写入缓冲区的数据写入通道,并取消注册。

channel.write(writeBuffer);  
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);  

在网上搜索有关NIO信息时,发现名为natty的异步通信框架,先记录下来。

相关标签: 异步 通信