欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java io nio netty三种方式实现简单聊天功能 博客分类: java基础 ionionetty聊天 

程序员文章站 2024-03-18 22:46:40
...

 

netty和nio的比较:

http://news.cnblogs.com/n/205413/ 

 

一:首先是Java IO:

 

Server: 

 

package com.tch.test.chat.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

public class Server {

	private static AtomicInteger counter = new AtomicInteger(0);
	private static Map<Integer, PrintWriter> clients = new ConcurrentHashMap<>();
	private static ExecutorService pool = Executors.newCachedThreadPool();
	
	public static void main(String[] args) throws Throwable {
		ServerSocket serverSocket = new ServerSocket(8888);
		try {
			while(true){
				Socket socket = serverSocket.accept();
				System.out.println("a client connected ...");
				pool.execute(new ClientSocketHandler(socket));
			}
		} finally{
			serverSocket.close();
		}
	}
	
	private static class ClientSocketHandler implements Runnable{
		private Socket socket;
		private Integer id;
		public ClientSocketHandler(Socket socket){
			this.socket = socket;
			id = counter.incrementAndGet();
		}
		@Override
		public void run() {
			try {
				BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
				PrintWriter out = new PrintWriter(socket.getOutputStream());
				clients.put(id, out);
				System.out.println("client num:" + clients.size());
				String msg = null;
				while((msg = in.readLine()) != null){
					System.out.println("receive msg '" + msg + "' from client-" + id);
					sendBack2Client("cllient-" + id + " said : " + msg);
					out.flush();
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
	private static void sendBack2Client(String msg){
		for(PrintWriter client : clients.values()){
			client.println(msg);
			client.flush();
		}
	}
	
}

 

Client:

 

package com.tch.test.chat.io;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

public class Client {

	public static void main(String[] args) throws Throwable {
		Socket socket = new Socket("localhost", 8888);
		try {
			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
			PrintWriter out = new PrintWriter(socket.getOutputStream());
			new Thread(new ServerResponseHandler(in)).start();
			BufferedReader userInput = new BufferedReader(new InputStreamReader(System.in));
			String msg = null;
			while((msg = userInput.readLine()) != null){
				out.println(msg);
				out.flush();
			}
		} finally{
			socket.close();
		}
	}
	
	private static class ServerResponseHandler implements Runnable{
		BufferedReader in;
		public ServerResponseHandler(BufferedReader in){
			this.in = in;
		}
		@Override
		public void run() {
			try {
				String msg = null;
				while((msg = in.readLine()) != null){
					System.out.println(msg);
				}
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
	
}

 

 

二:使用Java NIO:

 

Server:

 

package com.tch.test.chat.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

public class NioServer {

	private Set<SelectionKey> selectionKeys = null;
	private Iterator<SelectionKey> iterator = null;
	private Iterator<SocketChannel> iterator2 = null;
	private List<SocketChannel> clients = new ArrayList<SocketChannel>();
	private static Selector selector;
	
	static{
		 try {
			selector = Selector.open();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws Exception {
		new NioServer().start();
	}
	
	private void start() throws Exception{
		initSeverSocketChannel();
		while(true){
			int ready = selector.select();
			if(ready > 0){
				selectionKeys = selector.selectedKeys();
				iterator = selectionKeys.iterator();
				while(iterator.hasNext()){
					SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isAcceptable()){
                    	acceptClient(selectionKey);
                    }else if(selectionKey.isReadable()){
                    	readMsg(selectionKey);
                    }
					iterator.remove();
				}
			}
		}
	}
	
	private void initSeverSocketChannel() throws Exception{
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		serverSocketChannel.bind(new InetSocketAddress(7878));
		serverSocketChannel.configureBlocking(false);
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
	}
	
	private void acceptClient(SelectionKey selectionKey) throws Exception{
		ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
		SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
        clients.add(socketChannel);
        System.out.println("a client connected ...");
	}
	
	private void readMsg(SelectionKey selectionKey) throws Exception{
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
        buffer.clear();
        socketChannel.read(buffer);
        buffer.flip();
        iterator2 = clients.iterator();
        SocketChannel socketChannel2 = null;
        while(iterator2.hasNext()){
            socketChannel2 = iterator2.next();
            while(buffer.hasRemaining()){
                socketChannel2.write(buffer);
            }
            buffer.rewind();
        }
	}
	
}

 

Client:

 

package com.tch.test.chat.nio;

import java.awt.GridLayout;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JTextArea;
import javax.swing.JTextField;

public class NioClient extends JFrame{

	private static final long serialVersionUID = 1L;
	private JTextArea area = new JTextArea();
	private JTextField textField = new JTextField();
	private JButton button = new JButton("Send Message");
	private static Selector selector;
	
	static{
		 try {
			selector = Selector.open();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws Exception {
		NioClient client = new NioClient();
		client.start();
	}
	
	private void start() throws Exception{
		
		initFrame();
		
		initSocketChannel();
		while(true){
			int ready = selector.select();
			if(ready > 0){
				Set<SelectionKey> selectionKeys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = selectionKeys.iterator();
				while(iterator.hasNext()){
					SelectionKey selectionKey = iterator.next();
					if(selectionKey.isReadable()){
						
						readMsg(selectionKey);
					}
					iterator.remove();
				}
			}
		}
	}
	
	private void readMsg(SelectionKey selectionKey) throws Exception{
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
		socketChannel.read(buffer);
		buffer.flip();
		area.setText(area.getText().trim()+"\n"+new String(buffer.array(),0,buffer.limit(),"utf-8"));
		buffer.clear();
	}
	
	private void initFrame(){
		setBounds(200, 200, 300, 400);
		setLayout(new GridLayout(3, 1));
		add(area);
		add(textField);
		add(button);
		setDefaultCloseOperation(EXIT_ON_CLOSE);
		setVisible(true);
	}
	
	private void initSocketChannel() throws Exception{
		SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 7878));
		socketChannel.configureBlocking(false);
		socketChannel.register(selector, SelectionKey.OP_READ);
		button.addActionListener(new MyActionListener(socketChannel));
	}
	
	private class MyActionListener implements ActionListener{
		
		private SocketChannel socketChannel;
		
		public MyActionListener(SocketChannel socketChannel){
			this.socketChannel = socketChannel;
		}
		@Override
		public void actionPerformed(ActionEvent event) {
			try {
				ByteBuffer buffer = ByteBuffer.allocate(1024);
				String message = textField.getText();
				if(message == null || message.trim().isEmpty()){
					System.out.println("empty message");
					return;
				}
				textField.setText("");
				buffer.put(message.getBytes("utf-8"));
				buffer.flip();
				while(buffer.hasRemaining()){
					socketChannel.write(buffer);
				}
				buffer.clear();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
	
}

 

 

三:最后是使用netty(参考jar包里面example里面的例子):

 

pom.xml添加依赖:

 

		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.0.36.Final</version>
		</dependency>

 

Server:

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.example.telnet.TelnetServer;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

/**
 * Simple SSL chat server modified from {@link TelnetServer}.
 */
public final class NettyServer {

    private static final int PORT = 8992;

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new MyServerChannelInitializer());

            b.bind(PORT).sync().channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Creates a newly configured {@link ChannelPipeline} for a new channel.
 */
public class MyServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // On top of the SSL handler, add the text line codec.
        pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());

        // and then business logic.
        pipeline.addLast(new MyServerHandler());
    }
}

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Handles a server-side channel.
 */
public class MyServerHandler extends SimpleChannelInboundHandler<String> {

	private static AtomicInteger counter = new AtomicInteger(0);
	private static Map<Channel, Integer> channels = new ConcurrentHashMap<Channel, Integer>();

    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws UnknownHostException {
    	Integer channelNum = counter.incrementAndGet();
    	channels.put(ctx.channel(), channelNum);
    	ctx.writeAndFlush("Hello user-" + channelNum + "\r\n");
    }

    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        // Send the received message to all channels but the current one.
        for (Map.Entry<Channel, Integer> entry : channels.entrySet()) {
        	Channel c = entry.getKey();
            if (c != ctx.channel()) {
                c.writeAndFlush("user-" + channels.get(ctx.channel()) + " said: " + msg + '\n');
            } else {
                c.writeAndFlush("[you] said: " + msg + '\n');
            }
        }

        // Close the connection if the client has sent 'bye'.
        if ("bye".equals(msg.toLowerCase())) {
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

 

 

Client:

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.example.telnet.TelnetClient;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * Simple SSL chat client modified from {@link TelnetClient}.
 */
public final class NettyClient {

	private static final String HOST = "127.0.0.1";
	private static final int PORT = 8992;

    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new MyClientChannelInitializer());

            // Start the connection attempt.
            Channel ch = b.connect(HOST, PORT).sync().channel();

            // Read commands from the stdin.
            ChannelFuture lastWriteFuture = null;
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            for (;;) {
                String line = in.readLine();
                if (line == null) {
                    break;
                }

                // Sends the received line to the server.
                lastWriteFuture = ch.writeAndFlush(line + "\r\n");

                // If user typed the 'bye' command, wait until the server closes
                // the connection.
                if ("bye".equals(line.toLowerCase())) {
                    ch.closeFuture().sync();
                    break;
                }
            }

            // Wait until all messages are flushed before closing the channel.
            if (lastWriteFuture != null) {
                lastWriteFuture.sync();
            }
        } finally {
            // The connection is closed automatically on shutdown.
            group.shutdownGracefully();
        }
    }
}

 

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Creates a newly configured {@link ChannelPipeline} for a new channel.
 */
public class MyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    public void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // On top of the SSL handler, add the text line codec.
        pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder());
        pipeline.addLast(new StringEncoder());

        // and then business logic.
        pipeline.addLast(new MyClientHandler());
    }
}

 

 

/*
 * Copyright 2012 The Netty Project
 *
 * The Netty Project licenses this file to you under the Apache License,
 * version 2.0 (the "License"); you may not use this file except in compliance
 * with the License. You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations
 * under the License.
 */
package com.tch.test.chat.netty;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;

/**
 * Handles a client-side channel.
 */
public class MyClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    public void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

 

 

OK,以上就是不同的方式实现聊天。