欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

NIO例子

程序员文章站 2022-10-03 17:15:58
client向server发送questionId,server随机得到1个答案,然后把questionId和答案发送给clientAnswerServerpublic class AnswerServer { private static AtomicLong atomicLong = new AtomicLong(); public static final String[] answers = {"A","B","C","D"}; public stat.....

client向server发送questionId,server随机得到1个答案,然后把questionId和答案发送给client

 

AnswerServer 

public class AnswerServer {

    private static AtomicLong atomicLong = new AtomicLong();

    public static final String[] answers = {"A","B","C","D"};

    public static final String RES_CONNECTION_OK = "OK";

    static Random random = new Random(3);

    private Selector selector;

    public AnswerServer(){
        try{
            this.init();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void init() throws Exception{
        //打开1个io多路复用器
        selector = Selector.open();

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        //绑定服务端口
        serverSocketChannel.socket().bind(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));

        System.out.println("answer server linten port="+Constant.SERVER.PORT);

        //对于服务端来说,一定要先注册一个OP_ACCEPT事件用来响应客户端的请求连接
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

    }

    public void start() throws Exception{

        while (true) {
            this.selector.select();
            System.out.println("the coming keys="+this.selector.selectedKeys());

            Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();

            while(iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                SocketChannel socketChannel = null;
                int type = 0;
                try {
                    if (key.isAcceptable()) {
                        //这里key.channel是ServerSocketChannel
                        type = SelectionKey.OP_ACCEPT;
                        this.accept(key);
                    } else if (key.isValid() && key.isReadable()) {
                        type = SelectionKey.OP_READ;
                        socketChannel = (SocketChannel) key.channel();
                        this.read(socketChannel, key);
                    }
                }catch (Exception e) {
                    key.cancel();
                    if(socketChannel!=null){
                        socketChannel.socket().close();
                        socketChannel.close();
                    }
                    System.out.println("exception at type="+type);
                    e.printStackTrace();
                }

            }

        }
    }

    private void accept(SelectionKey key) throws Exception {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        System.out.println("开始注册读时间");

        socketChannel.register(this.selector,SelectionKey.OP_READ);

        this.sayAcceptOk(socketChannel);

    }

    private void sayAcceptOk(SocketChannel socketChannel) throws Exception{
        ByteBuffer byteBuffer = ByteBuffer.wrap(AnswerServer.RES_CONNECTION_OK.getBytes());
        System.out.println("AnswerServer发送连接成功!");
        socketChannel.write(byteBuffer);
    }

    private void read(SocketChannel channel,SelectionKey key) throws Exception {
        System.out.println(atomicLong.incrementAndGet()+",开始读数据 key="+key);
        // 先得到题目

        ByteBuffer buffer = ByteBuffer.allocate(50);

        while (channel.isOpen() && channel.read(buffer) != -1) {
           
            if (buffer.position() > 0) {
                System.out.println("数据读完了-----------------------position="+buffer.position());
                break;
            }
        }
        if (buffer.position() == 0) {
            System.out.println("没有数据了-----------------------");
            return; // 如果没数据了, 则不继续后面的处理
        }

        String questionId = ByteBufferUtil.toString(buffer);
        System.out.println("========read questionId :" + questionId);

        String answer = answers[random.nextInt(4)];

        String response = "questionId="+questionId+",answerId="+answer;

        ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
        channel.write(responseBuffer);

    }

    public static void main(String[] args) throws Exception{
        new AnswerServer().start();
    }

}

AnswerNIOClient

public class AnswerNIOClient implements MyRunnable {

    private Selector selector;
    SocketChannel socketChannel;
    private AtomicLong atomicLong = new AtomicLong();

    public AnswerNIOClient() throws Exception {
        this.init();
    }


    @Override
    public void init() throws Exception {
        selector = Selector.open();
        socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
//        System.out.println("end init method");
    }

    @Override
    public void start() throws Exception {
//        System.out.println("call start method");
        try {
            this.doConnect();
//            System.out.println("after call doConnect method");
        }catch (Exception e){
            System.out.println("连接失败!");
            e.printStackTrace();
            System.exit(-1);
        }


        while (true) {
            selector.select();
            try {

                Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();

                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();

                    SocketChannel sc = (SocketChannel) key.channel();

                    System.out.println("isTrue=" + (sc==socketChannel));
                    System.out.println("socketChannel=" + socketChannel);

                    if(key.isConnectable()) {
                        if (sc.finishConnect()) {
                                System.out.println("==========finishConnect==================================");
                            sc.register(selector, SelectionKey.OP_READ);
                            this.doWrite(socketChannel);
                        } else {
                            //连接失败 进程退出
                            System.exit(1);
                        }

                    } else if (key.isValid() && key.isReadable()) {
                        this.doRead(sc, key);
                    } else if (key.isValid() && key.isWritable()) {
                        this.doWrite(sc);
                    }

                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    private void doConnect() throws Exception {
        //通过ip和端口号连接到服务器
        boolean connected = this.socketChannel.connect(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));
        System.out.println("doConnect="+connected);
        if(connected){
            //向多路复用器注册可读事件
            socketChannel.register(this.selector,SelectionKey.OP_READ);
        } else {
            //若连接服务器失败,则向多路复用器注册连接事件
            socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
        }

    }

    private void doRead(SocketChannel socketChannel,SelectionKey key) throws Exception {
        ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        socketChannel.read(readBuffer);
        String content = ByteBufferUtil.toString(readBuffer);
        System.out.println("服务器响应! 问题及答案:"+content);

        socketChannel.register(this.selector,SelectionKey.OP_WRITE);

    }


    private void doWrite(SocketChannel socketChannel) throws Exception {
        Long questionId = getQuestionId();

        ByteBuffer byteBuffer = ByteBuffer.wrap(String.valueOf(questionId).getBytes());
        socketChannel.write(byteBuffer);
        System.out.println("AnswerNIOClient 发送查询答案! questionId="+questionId);

        socketChannel.register(this.selector,SelectionKey.OP_READ);

    }


    private Long getQuestionId() throws Exception{
        TimeUnit.SECONDS.sleep(3);
        Long questionId = atomicLong.incrementAndGet();
        return questionId;


    }

    public static void main(String[] args) throws Exception{
        new AnswerNIOClient().start();
    }

}

AnswerBIOClient
public class AnswerBIOClient {

    Socket socket = new Socket();

    private AtomicLong atomicLong = new AtomicLong();

    private boolean isConnectionOk = false;

    public AnswerBIOClient(){

        try{
            init();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void init() throws Exception{
        socket.connect(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));
        if(socket.isConnected()) {
            System.out.println("AnswerBIOClient连接成功! 服务端端口="+socket.getPort()+" 客户端端口="+socket.getLocalPort());
        }
    }

    public void start() throws Exception{
        Runnable readRunnable = ()->{
            try {
                read();
            }catch (Exception e){
                System.out.println("readRunnable exception "+e.getMessage());
                e.printStackTrace();
            }
        };

        Runnable writeRunnable = ()->{
            try {
                write();
            }catch (Exception e){
                System.out.println("readRunnable exception "+e.getMessage());
                e.printStackTrace();
            }
        };

        Thread readThread = new Thread(readRunnable,"readThread");
        Thread writeThread = new Thread(writeRunnable,"writeThread");

        readThread.start();
        writeThread.start();

    }

    private void read1() throws Exception{
        while(true) {
            //服务器返回需要加\n
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            System.out.println("等待读数据");
            String line = null;
            while((line=bufferedReader.readLine())!=null) {
                System.out.println("AnswerBIOClient read server data =" + line);
            }

        }

    }

    private void read() throws Exception{

        while(true) {
            System.out.println("等待读数据");

            byte[] bytes = new byte[512];
            int readLength = socket.getInputStream().read(bytes);
            System.out.println("readLength="+readLength);
            while (readLength!=-1) {
                String result = new String(bytes,0,readLength);
                if(!isConnectionOk){
                    isConnectionOk = result.equals(AnswerServer.RES_CONNECTION_OK);
                    System.out.println("result="+result+" isConnectionOk="+isConnectionOk+" o:"+result.equals(AnswerServer.RES_CONNECTION_OK));
                }
            }

        }

    }

    private void write() throws Exception{

        while (true) {
            if (isConnectionOk) {
                TimeUnit.SECONDS.sleep(5);
                Long questionId = atomicLong.incrementAndGet();
                System.out.println("客户端开始发送 questionId=" + questionId);
                socket.getOutputStream().write(String.valueOf(questionId).getBytes());
                socket.getOutputStream().flush();
            }
        }

    }

    public static void main(String[] args) throws Exception{

        new AnswerBIOClient().start();

    }

}

 

 

 

 

 

 

 

 

 

 

 

本文地址:https://blog.csdn.net/kq1983/article/details/107495341

相关标签: nio