# TCP 编程

# 要求

  • 请编写一个群聊的程序,包括服务端程序和客户端程序。
  • 服务端功能:收到某客户端的信息,将消息在控制台输出,然后,发给其他另外的客户端。
  • 客户端功能:每隔 5 秒发送一条信息给服务端。然后接收服务器转发过来的消息,并在控制台输出。

# 客户端

  • 由于每隔 5 秒发送一条信息给服务端,所以考虑采用串行结构,每隔五秒发送固定字符,再进行读取
  • 也可以采用多线程,一个线程负责读取,一个线程负责发送
  • 由于 DataOutputStream 需要关闭流才能进行发送,而一旦关闭了流也就关闭了 Socket,会抛出 Socket is closed 异常,所以采用 PrintWriter 而非 DataOutputStream,服务端同理
import java.io.*;
import java.net.InetAddress;
import java.net.Socket;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
public class Client {
    public static void main(String[] args) throws Exception {
        // 定义时间格式类
        DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 定义 Socket 指向本机 8001 端口
        Socket socket = new Socket(InetAddress.getByName("127.0.0.1"), 8001);
        // 定义输入流
        InputStream inputStream = socket.getInputStream();
        // 定义缓冲字符输入流
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
        // 定义输出流
        OutputStream outputStream = socket.getOutputStream();
        // 定义缓冲字符输出流
        PrintWriter printWriter=new PrintWriter(outputStream);
        while (true) {
            int localPort = socket.getLocalPort();
            String msg=localPort + "port send a message to server";
            // 向输出流中写入数据 
            printWriter.println(msg);
            // 刷新缓冲区,向服务端发送信息 
            printWriter.flush();
            // 如果输入流中有数据,则读取
            if (bufferedReader.ready()) {
                System.out.print(normal.format(new Date())+" Receive from Server:");
                System.out.println(bufferedReader.readLine());
            }
            try {
                Thread.sleep(5000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

# 服务端

  • 服务端相比于客户端更为复杂,首先考虑使用线程池来进行多个 Client 连接
  • 为了实现群发效果,需要使用 List 记录当前处于连接状态的所有 Socket,在群发过程中进行逐个遍历,同时更新 List 去除失去连接的 Socket,使用 CopyOnWriteArrayList 防止线程不同步
import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class Server {
    public static void main(String[] args) throws Exception {
        // 定义时间格式化类
        DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        // 定义固定大小线程池
        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
        // 定义 ServerSocket 绑定 8001 端口
        ServerSocket serverSocket = new ServerSocket(8001);
        // 定义列表存入当前处于连接中的 Socket,便于群发
        // 使用 CopyOnWriteArrayList 防止线程不同步
        List<Socket> sockets = new CopyOnWriteArrayList<Socket>();
        while (true) {
            System.out.println(normal.format(new Date())+" Waiting...");
            // 等待连接
            Socket socket = serverSocket.accept();
            //socket 加入列表
            sockets.add(socket);
            System.out.println(normal.format(new Date())+" Connecting successfully " + socket.getInetAddress() + ":" + socket.getPort());
            // 构造线程处理 socket
            executor.execute(new SingleServer(sockets, socket));
        }
    }
}
class SingleServer implements Runnable {
    List<Socket> sockets;
    Socket socket;
    public SingleServer(List<Socket> sockets, Socket socket) {
        this.sockets = sockets;
        this.socket = socket;
    }
    @Override
    public void run() {
        // 定义时间格式化类
        DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        try {
            // 定义输入流
            InputStream inputStream = socket.getInputStream();
            // 定义缓冲字符输入流
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            while (true) {
                // 读取输入流数据
                System.out.println(normal.format(new Date())+" Receive from " + socket.getInetAddress() + ":" + socket.getPort() +"  Message:"+ bufferedReader.readLine());
                synchronized (sockets) {
                    Iterator<Socket> iterator=sockets.iterator();
                    // 逐个遍历 List
                    while(iterator.hasNext()) {
                        Socket soc=iterator.next();
                        if (soc.isConnected()) {
                            // 如果不是发送端 Socket, 则向其群发
                            if (soc != socket) {
                                OutputStream outputStream = soc.getOutputStream();
                                PrintWriter printWriter = new PrintWriter(outputStream);
                                printWriter.println(bufferedReader.readLine());
                                printWriter.flush();
                            }
                        } else {
                            // 去除失去连接的 Socket
                            iterator.remove();
                        }
                    }
                }
                Thread.sleep(100);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

# NIO 编程

# 要求

  • 请基于 NIO (第 6 章第六节的 NIO,非 AIO) 编写一个群聊的程序,包括服务端程序和客户端程序。
  • 服务端功能:只用一个线程,收到某客户端的信息,将消息在控制台输出,然后,发给其他另外的客户端。
  • 客户端功能:每隔 5 秒发送一条信息给服务端。然后接收服务器转发过来的消息,并在控制台输出。

# 客户端

  • 和 TCP 编程不同,客户端的 Selector 轮询需要不停地进行,而发送消息需要每隔 5 秒进行,无法进行串行编写,所以需要创建一个新线程进行发送消息
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class Client {
    public static void main(String[] args) throws Exception {
        // 定义 Selector
        Selector selector = Selector.open();
        // 定义 SocketChannel
        SocketChannel socketChannel = SocketChannel.open();
        // 设置非阻塞模式
        socketChannel.configureBlocking(false);
        // 连接服务端并注册事件
        if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 8001))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
        } else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
        // 构造新线程向服务端发送消息
        new Thread(new DoWrite(socketChannel)).start();
        // 选择器轮询
        while (true) {
            try {
                //1000ms 响应时间
                selector.select(1000);
                // 获得感兴趣事件的 SelectionKey
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                // 逐个遍历
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    try {
                        // 处理
                        handleInput(selector, key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    private static void handleInput(Selector selector, SelectionKey key) throws Exception {
        // 定义时间格式化类
        DateFormat normal = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        if (key.isValid()) {
            // 获得 Key 的 SocketChannel
            SocketChannel socketChannel = (SocketChannel) key.channel();
            // 连接状态 (连接中)
            if (key.isConnectable()) {
                // 如果连接成功则向 Selector 注册 READ 事件
                if (socketChannel.finishConnect()) {
                    socketChannel.register(selector, SelectionKey.OP_READ);
                }
            }
            // 可读状态 (收到服务端的数据)
            if (key.isReadable()) {
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                // 读取 socketChannel 中的数据
                int readBytes = socketChannel.read(byteBuffer);
                if (readBytes > 0) {
                    // 写入 buffer-> 读取 buffer
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String message = new String(bytes, "UTF-8");
                    System.out.println(normal.format(new Date()) + " Receive from Server:" + message);
                } else if (readBytes < 0) {
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }
}
class DoWrite implements Runnable{
    private SocketChannel socketChannel;
    public DoWrite(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }
    @Override
    public void run() {
        while(true) {
            try {
                // 先休眠 5 秒防止未成功连接
                Thread.sleep(5000);
                int localPort=socketChannel.socket().getLocalPort();
                String message=localPort + "port send a message to server";
                byte[] str=message.getBytes(StandardCharsets.UTF_8);
                ByteBuffer buffer=ByteBuffer.allocate(str.length);
                buffer.put(str);
                // 写入 buffer-> 读取 buffer
                buffer.flip();
                // 向 SocketChannel 写入数据
                socketChannel.write(buffer);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

# 服务端

  • 和客户端类似,故不再赘述
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
public class Server {
    public static void main(String[] args) throws Exception {
        int port = 8001;
        // 定义 Selector
        Selector selector = Selector.open();
        // 定义 ServerSocketChannel
        ServerSocketChannel socketChannel = ServerSocketChannel.open();
        // 设置非阻塞模式
        socketChannel.configureBlocking(false);
        //ServerSocketChannel 绑定端口
        socketChannel.socket().bind(new InetSocketAddress(port), 1024);
        // 注册事件为 OP_ACCEPT
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        // 选择器轮询
        while (true) {
            System.out.println("Waiting...");
            //1000ms 响应时间
            selector.select(1000);
            // 获得感兴趣事件的 SelectionKey
            Set<SelectionKey> selectionKeySet = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeySet.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    // 处理
                    handleInput(selector, key);
                } catch (Exception e) {
                    if (key != null) {
                        key.cancel();
                        if (key.channel() != null) {
                            key.channel().close();
                        }
                    }
                }
            }
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    public static void handleInput(Selector selector, SelectionKey key) throws IOException {
        DateFormat normal = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        if (key.isValid()) {
            // 接收状态 (收到客户端的连接请求)
            if (key.isAcceptable()) {
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                // 接收客户端的连接
                SocketChannel socketChannel = serverSocketChannel.accept();
                // 设置非阻塞模式
                socketChannel.configureBlocking(false);
                // 注册事件为 READ
                socketChannel.register(selector, SelectionKey.OP_READ);
                System.out.println(normal.format(new Date())+" Connecting successfully " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort());
            }
            // 可读状态 (收到客户端的数据)
            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                // 读取 socketChannel 中的数据
                int readBytes = socketChannel.read(byteBuffer);
                if (readBytes > 0) {
                    // 写入 buffer-> 读取 buffer
                    byteBuffer.flip();
                    byte[] bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String message = new String(bytes, "UTF-8");
                    System.out.println(normal.format(new Date()) + " Receive from " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort() + "  Message:" + message);
                    // 群发操作
                    broadcastClient(message, selector, socketChannel);
                } else if (readBytes < 0) {
                    key.cancel();
                    socketChannel.close();
                }
            }
        }
    }
    private static void broadcastClient(String message, Selector selector, SocketChannel socketChannel) throws IOException {
        // 获得所有连接中的 key
        Set<SelectionKey> keys = selector.keys();
        Iterator<SelectionKey> iterator = keys.iterator();
        while (iterator.hasNext()) {
            SelectionKey key = iterator.next();
            // 如果不是发送端 Socket, 则向其群发
            if (key.channel() != socketChannel && key.channel() instanceof SocketChannel) {
                ((SocketChannel) key.channel()).write(Charset.forName("UTF-8").encode(message));
            }
        }
    }
}