# TCP 编程
# 要求
- 请编写一个群聊的程序,包括服务端程序和客户端程序。
- 服务端功能:收到某客户端的信息,将消息在控制台输出,然后,发给其他另外的客户端。
- 客户端功能:每隔 5 秒发送一条信息给服务端。然后接收服务器转发过来的消息,并在控制台输出。
# 客户端
- 由于每隔 5 秒发送一条信息给服务端,所以考虑采用串行结构,每隔五秒发送固定字符,再进行读取
- 也可以采用多线程,一个线程负责读取,一个线程负责发送
- 由于 DataOutputStream 需要关闭流才能进行发送,而一旦关闭了流也就关闭了 Socket,会抛出 Socket is closed 异常,所以采用 PrintWriter 而非 DataOutputStream,服务端同理
import java.io.*; | |
import java.net.InetAddress; | |
import java.net.Socket; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
public class Client { | |
public static void main(String[] args) throws Exception { | |
// 定义时间格式类 | |
DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); | |
// 定义 Socket 指向本机 8001 端口 | |
Socket socket = new Socket(InetAddress.getByName("127.0.0.1"), 8001); | |
// 定义输入流 | |
InputStream inputStream = socket.getInputStream(); | |
// 定义缓冲字符输入流 | |
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); | |
// 定义输出流 | |
OutputStream outputStream = socket.getOutputStream(); | |
// 定义缓冲字符输出流 | |
PrintWriter printWriter=new PrintWriter(outputStream); | |
while (true) { | |
int localPort = socket.getLocalPort(); | |
String msg=localPort + "port send a message to server"; | |
// 向输出流中写入数据 | |
printWriter.println(msg); | |
// 刷新缓冲区,向服务端发送信息 | |
printWriter.flush(); | |
// 如果输入流中有数据,则读取 | |
if (bufferedReader.ready()) { | |
System.out.print(normal.format(new Date())+" Receive from Server:"); | |
System.out.println(bufferedReader.readLine()); | |
} | |
try { | |
Thread.sleep(5000); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} |
# 服务端
- 服务端相比于客户端更为复杂,首先考虑使用线程池来进行多个 Client 连接
- 为了实现群发效果,需要使用 List 记录当前处于连接状态的所有 Socket,在群发过程中进行逐个遍历,同时更新 List 去除失去连接的 Socket,使用 CopyOnWriteArrayList 防止线程不同步
import java.io.*; | |
import java.net.ServerSocket; | |
import java.net.Socket; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.concurrent.CopyOnWriteArrayList; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ThreadPoolExecutor; | |
public class Server { | |
public static void main(String[] args) throws Exception { | |
// 定义时间格式化类 | |
DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); | |
// 定义固定大小线程池 | |
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4); | |
// 定义 ServerSocket 绑定 8001 端口 | |
ServerSocket serverSocket = new ServerSocket(8001); | |
// 定义列表存入当前处于连接中的 Socket,便于群发 | |
// 使用 CopyOnWriteArrayList 防止线程不同步 | |
List<Socket> sockets = new CopyOnWriteArrayList<Socket>(); | |
while (true) { | |
System.out.println(normal.format(new Date())+" Waiting..."); | |
// 等待连接 | |
Socket socket = serverSocket.accept(); | |
//socket 加入列表 | |
sockets.add(socket); | |
System.out.println(normal.format(new Date())+" Connecting successfully " + socket.getInetAddress() + ":" + socket.getPort()); | |
// 构造线程处理 socket | |
executor.execute(new SingleServer(sockets, socket)); | |
} | |
} | |
} | |
class SingleServer implements Runnable { | |
List<Socket> sockets; | |
Socket socket; | |
public SingleServer(List<Socket> sockets, Socket socket) { | |
this.sockets = sockets; | |
this.socket = socket; | |
} | |
@Override | |
public void run() { | |
// 定义时间格式化类 | |
DateFormat normal=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); | |
try { | |
// 定义输入流 | |
InputStream inputStream = socket.getInputStream(); | |
// 定义缓冲字符输入流 | |
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); | |
while (true) { | |
// 读取输入流数据 | |
System.out.println(normal.format(new Date())+" Receive from " + socket.getInetAddress() + ":" + socket.getPort() +" Message:"+ bufferedReader.readLine()); | |
synchronized (sockets) { | |
Iterator<Socket> iterator=sockets.iterator(); | |
// 逐个遍历 List | |
while(iterator.hasNext()) { | |
Socket soc=iterator.next(); | |
if (soc.isConnected()) { | |
// 如果不是发送端 Socket, 则向其群发 | |
if (soc != socket) { | |
OutputStream outputStream = soc.getOutputStream(); | |
PrintWriter printWriter = new PrintWriter(outputStream); | |
printWriter.println(bufferedReader.readLine()); | |
printWriter.flush(); | |
} | |
} else { | |
// 去除失去连接的 Socket | |
iterator.remove(); | |
} | |
} | |
} | |
Thread.sleep(100); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
# NIO 编程
# 要求
- 请基于 NIO (第 6 章第六节的 NIO,非 AIO) 编写一个群聊的程序,包括服务端程序和客户端程序。
- 服务端功能:只用一个线程,收到某客户端的信息,将消息在控制台输出,然后,发给其他另外的客户端。
- 客户端功能:每隔 5 秒发送一条信息给服务端。然后接收服务器转发过来的消息,并在控制台输出。
# 客户端
- 和 TCP 编程不同,客户端的 Selector 轮询需要不停地进行,而发送消息需要每隔 5 秒进行,无法进行串行编写,所以需要创建一个新线程进行发送消息
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.SocketChannel; | |
import java.nio.charset.StandardCharsets; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.util.Iterator; | |
import java.util.Set; | |
public class Client { | |
public static void main(String[] args) throws Exception { | |
// 定义 Selector | |
Selector selector = Selector.open(); | |
// 定义 SocketChannel | |
SocketChannel socketChannel = SocketChannel.open(); | |
// 设置非阻塞模式 | |
socketChannel.configureBlocking(false); | |
// 连接服务端并注册事件 | |
if (socketChannel.connect(new InetSocketAddress("127.0.0.1", 8001))) { | |
socketChannel.register(selector, SelectionKey.OP_READ); | |
} else { | |
socketChannel.register(selector, SelectionKey.OP_CONNECT); | |
} | |
// 构造新线程向服务端发送消息 | |
new Thread(new DoWrite(socketChannel)).start(); | |
// 选择器轮询 | |
while (true) { | |
try { | |
//1000ms 响应时间 | |
selector.select(1000); | |
// 获得感兴趣事件的 SelectionKey | |
Set<SelectionKey> selectionKeys = selector.selectedKeys(); | |
Iterator<SelectionKey> iterator = selectionKeys.iterator(); | |
// 逐个遍历 | |
while (iterator.hasNext()) { | |
SelectionKey key = iterator.next(); | |
iterator.remove(); | |
try { | |
// 处理 | |
handleInput(selector, key); | |
} catch (Exception e) { | |
if (key != null) { | |
key.cancel(); | |
if (key.channel() != null) { | |
key.channel().close(); | |
} | |
} | |
} | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
private static void handleInput(Selector selector, SelectionKey key) throws Exception { | |
// 定义时间格式化类 | |
DateFormat normal = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); | |
if (key.isValid()) { | |
// 获得 Key 的 SocketChannel | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
// 连接状态 (连接中) | |
if (key.isConnectable()) { | |
// 如果连接成功则向 Selector 注册 READ 事件 | |
if (socketChannel.finishConnect()) { | |
socketChannel.register(selector, SelectionKey.OP_READ); | |
} | |
} | |
// 可读状态 (收到服务端的数据) | |
if (key.isReadable()) { | |
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); | |
// 读取 socketChannel 中的数据 | |
int readBytes = socketChannel.read(byteBuffer); | |
if (readBytes > 0) { | |
// 写入 buffer-> 读取 buffer | |
byteBuffer.flip(); | |
byte[] bytes = new byte[byteBuffer.remaining()]; | |
byteBuffer.get(bytes); | |
String message = new String(bytes, "UTF-8"); | |
System.out.println(normal.format(new Date()) + " Receive from Server:" + message); | |
} else if (readBytes < 0) { | |
key.cancel(); | |
socketChannel.close(); | |
} | |
} | |
} | |
} | |
} | |
class DoWrite implements Runnable{ | |
private SocketChannel socketChannel; | |
public DoWrite(SocketChannel socketChannel) { | |
this.socketChannel = socketChannel; | |
} | |
@Override | |
public void run() { | |
while(true) { | |
try { | |
// 先休眠 5 秒防止未成功连接 | |
Thread.sleep(5000); | |
int localPort=socketChannel.socket().getLocalPort(); | |
String message=localPort + "port send a message to server"; | |
byte[] str=message.getBytes(StandardCharsets.UTF_8); | |
ByteBuffer buffer=ByteBuffer.allocate(str.length); | |
buffer.put(str); | |
// 写入 buffer-> 读取 buffer | |
buffer.flip(); | |
// 向 SocketChannel 写入数据 | |
socketChannel.write(buffer); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} |
# 服务端
- 和客户端类似,故不再赘述
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.nio.ByteBuffer; | |
import java.nio.channels.SelectionKey; | |
import java.nio.channels.Selector; | |
import java.nio.channels.ServerSocketChannel; | |
import java.nio.channels.SocketChannel; | |
import java.nio.charset.Charset; | |
import java.text.DateFormat; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.util.Iterator; | |
import java.util.Set; | |
public class Server { | |
public static void main(String[] args) throws Exception { | |
int port = 8001; | |
// 定义 Selector | |
Selector selector = Selector.open(); | |
// 定义 ServerSocketChannel | |
ServerSocketChannel socketChannel = ServerSocketChannel.open(); | |
// 设置非阻塞模式 | |
socketChannel.configureBlocking(false); | |
//ServerSocketChannel 绑定端口 | |
socketChannel.socket().bind(new InetSocketAddress(port), 1024); | |
// 注册事件为 OP_ACCEPT | |
socketChannel.register(selector, SelectionKey.OP_ACCEPT); | |
// 选择器轮询 | |
while (true) { | |
System.out.println("Waiting..."); | |
//1000ms 响应时间 | |
selector.select(1000); | |
// 获得感兴趣事件的 SelectionKey | |
Set<SelectionKey> selectionKeySet = selector.selectedKeys(); | |
Iterator<SelectionKey> iterator = selectionKeySet.iterator(); | |
while (iterator.hasNext()) { | |
SelectionKey key = iterator.next(); | |
iterator.remove(); | |
try { | |
// 处理 | |
handleInput(selector, key); | |
} catch (Exception e) { | |
if (key != null) { | |
key.cancel(); | |
if (key.channel() != null) { | |
key.channel().close(); | |
} | |
} | |
} | |
} | |
try { | |
Thread.sleep(500); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
public static void handleInput(Selector selector, SelectionKey key) throws IOException { | |
DateFormat normal = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); | |
if (key.isValid()) { | |
// 接收状态 (收到客户端的连接请求) | |
if (key.isAcceptable()) { | |
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); | |
// 接收客户端的连接 | |
SocketChannel socketChannel = serverSocketChannel.accept(); | |
// 设置非阻塞模式 | |
socketChannel.configureBlocking(false); | |
// 注册事件为 READ | |
socketChannel.register(selector, SelectionKey.OP_READ); | |
System.out.println(normal.format(new Date())+" Connecting successfully " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort()); | |
} | |
// 可读状态 (收到客户端的数据) | |
if (key.isReadable()) { | |
SocketChannel socketChannel = (SocketChannel) key.channel(); | |
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); | |
// 读取 socketChannel 中的数据 | |
int readBytes = socketChannel.read(byteBuffer); | |
if (readBytes > 0) { | |
// 写入 buffer-> 读取 buffer | |
byteBuffer.flip(); | |
byte[] bytes = new byte[byteBuffer.remaining()]; | |
byteBuffer.get(bytes); | |
String message = new String(bytes, "UTF-8"); | |
System.out.println(normal.format(new Date()) + " Receive from " + socketChannel.socket().getInetAddress() + ":" + socketChannel.socket().getPort() + " Message:" + message); | |
// 群发操作 | |
broadcastClient(message, selector, socketChannel); | |
} else if (readBytes < 0) { | |
key.cancel(); | |
socketChannel.close(); | |
} | |
} | |
} | |
} | |
private static void broadcastClient(String message, Selector selector, SocketChannel socketChannel) throws IOException { | |
// 获得所有连接中的 key | |
Set<SelectionKey> keys = selector.keys(); | |
Iterator<SelectionKey> iterator = keys.iterator(); | |
while (iterator.hasNext()) { | |
SelectionKey key = iterator.next(); | |
// 如果不是发送端 Socket, 则向其群发 | |
if (key.channel() != socketChannel && key.channel() instanceof SocketChannel) { | |
((SocketChannel) key.channel()).write(Charset.forName("UTF-8").encode(message)); | |
} | |
} | |
} | |
} |