Thread-Per-Message 设计模式实现多用户的网络聊天

x33g5p2x  于2022-04-29 转载在 其他  
字(2.7k)|赞(0)|评价(0)|浏览(305)

一 点睛

Thread-Per-Message 模式在网络通信中的使用非常广泛,比如网络聊天程序,在服务端,每一个连接到服务端的连接都将创建一个独立的线程进行处理,当客户端的连接数超过了服务端的最大受理能力时,客户端将被存放到排队队列中。

二 实战

1 服务端程序

服务端用于接收来自客户端的连接,并且与之进行 TCP 通信交互,当服务端接收到每一次的客户端连接后便会给线程池提交一个任务用于与客户端进行交互,进而提高并发响应能力。 

package concurrent.networkchar;

import concurrent.threadpool.BasicThreadPool;
import concurrent.threadpool.ThreadPool;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class ChatServer {
    // 服务端端口
    private final int port;

    // 线程池
    private ThreadPool threadPool;

    // 服务端 Socket
    private ServerSocket serverSocket;

    public ChatServer(int port) {
        this.port = port;
    }

    public ChatServer() {
        this(13312);
    }

    public void startServer() throws IOException {
        // 创建线程池,初始化一个线程池
        this.threadPool = new BasicThreadPool(1, 4, 2, 1000);
        this.serverSocket = new ServerSocket(port);
        this.serverSocket.setReuseAddress(true);
        System.out.println("Char server is started and listen at listen at port: " + port);
        this.listen();
    }

    private void listen() throws IOException {
        for (; ; ) {
            // accept 是阻塞方法,当有新的连接进入时才返回,并且返回的是客户端的连接
            Socket client = serverSocket.accept();
            this.threadPool.execute(new ClientHandler(client));
        }
    }

    public static void main(String[] args) {
        ChatServer chatServer = new ChatServer();
        try {
            chatServer.startServer();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

2 响应客户端请求

package concurrent.networkchar;

import java.io.*;
import java.net.Socket;

/**
* @className: ClientHandler
* @description: 受理客户端请求
* @date: 2022/4/27
* @author: cakin
*/
public class ClientHandler implements Runnable {
    // 客户端的 socket 连接
    private final Socket socket;

    // 客户端id
    private final String clientIdentity;

    public ClientHandler(Socket socket) {
        this.socket = socket;
        this.clientIdentity = socket.getInetAddress().getHostAddress() + ":" + socket.getPort();
    }

    @Override
    public void run() {
        try {
            this.chat();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void chat() throws IOException {
        BufferedReader bufferedReader = wrap2Reader(this.socket.getInputStream());
        PrintStream printSteam = wrap2Print(this.socket.getOutputStream());
        String received;
        while ((received = bufferedReader.readLine()) != null) {
            // 将客户端发送的消息输出到控制台
            System.out.printf("client:%s-message%s\n", clientIdentity, received);
            if (received.equals("quit")) {
                // 如果客户端发送了 quit 指令,则断开与客户端的连接
                write2Client(printSteam, "client will close");
                socket.close();
                break;
            }
            // 向客户端发送消息
            write2Client(printSteam, "Server:" + received);
        }
    }

    // 将输入字节流封装成 BufferedReader 缓冲字符流
    private BufferedReader wrap2Reader(InputStream inputStream) {
        return new BufferedReader(new InputStreamReader(inputStream));
    }

    // 将输出字符流封装成 PrintStream
    private PrintStream wrap2Print(OutputStream outputStream) {
        return new PrintStream(outputStream);
    }

    // 该方法主要用于向客户端发送消息
    private void write2Client(PrintStream print, String message) {
        print.println(message);
        print.flush();
    }
}

三 说明

当接收到新的客户端请求时,会为每一个客户端连接创建一个线程与客户端进行交互,当客户端的连接个数超过线程池的最大数时,客户端虽然可以成功接入服务端,但是会进入阻塞队列。

相关文章