Thread Pooled Server trong Java

Table of contents

Thread Pooled server là một kiểu thiết kế server thuộc kiểu multithreaded, tuy nhiên thay vì tạo mới các thread cho mỗi request đến, kiểu thiết kế này sử dụng một pool các thread để tái sử dụng. Để phân biệt các kiểu thiết kế này, các bạn có thể tham khảo trong chuỗi bài viết này.

1. Pool và Thread Pool?

Trong Java hay trong các ngôn ngữ tương tự, khái niệm Pool thường được sử dụng như một hồ chứa các đối tượng có thể tái sử dụng. Ví dụ Thread Pool là một pool chứa các threads hay Connection Pool là một pool chứa các connections. Việc sử dụng pool giúp ta có thể quản lý các đối tượng tạo ra, tái sử dụng lại đối tượng này, cũng như có thể giải phóng khi không dùng, tạo mới khi thiếu. Và hơn hết có thể giúp ta cấu hình các thông số như giới hạn số lượng tạo ra, hạn chế việc tạo lại, …

Trong hình dưới đây minh hoạ về thread pool. Mỗi thread pool sẽ có 2 vấn đề cần quan tâm:

  • Queue chứa các task, hay job
  • Pool chứa các threads
ThreadPool

1.1. Queue trong Thread Pool

Queue là một hàng đợi chứa các tác vụ (task) hay công việc (job) mà các thread cần thực hiện. Với Queue, chúng ta có thể cấu hình một số thông số sau:

  • Kích thước queue: giới hạn về kích thước queue có thể chứa
  • Cách queue lưu trữ: thông thường là FIFO (First In First Out), cũng có thể sử dụng Priority (có độ ưu tiên cho các job)

Khi phía ứng dụng gửi các job sang thread pool, các job này được đẩy vào một hàng đợi queue để chờ xử lý. Thứ tự xử lý các job dựa trên cách mà queue lưu trữ, thông thường là FIFO. Các thread trong pool được chọn ra để xử lý lần lượt các job từ queue này.

1.2. Pool chứa các thread

Pool như đã đề cập là một nơi chứa các thread. Một số cấu hình cho một pool như sau:

  • Lượng thread tối đa được active
  • Lượng thread tối đa ở trạng thái idle (khi không có job)
  • Lượng thread tối thiẻu ở trạn thái idle (khi không có job)
  • Lượng thread khi khởi tạo (tạo sẵn các thread lúc start)
  • Kiểu thread pool được sử dụng (fix, dynamic, cache, scheduler, …)

Khi queue trống, các thread trong pool ở trạng thái idle. Khi queue nhận job, thread pool chọn ra một thread rỗi để xử lý một job, đến khi job hoàn tất, thread được giải phóng và trả lại cho pool.

2. Thread Pooled Server

Thread Pooled Server sử dụng cơ chế multithreaded server kết hợp với việc sử dụng một thread pool để quản lý các thread dùng để handle các connection từ client.

Việc sử dụng thread pool giúp tránh tạo quá nhiều thread khi có lượng lớn connection xảy ra đồng thời (concurrency), cũng như tái sử dụng các thread, tránh gây lãng phí khi tạo mơis và huỷ thread liên tục.

Dưới đây là một ví dụ minh hoạ hiện thực một threadpooled server:

3. Hiện thực một threadpooled server

Dưới đây là một hiện thực đơn giản cho threadpooled server, xem mã nguồn ví dụ tại đây.

/*
 * License from https://github.com/wearenodev
 */
package wearenodev.java.examples.thread;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 *
 * @author harisk
 */
public class ThreadPooledServer implements Runnable {

    /**
     * Worker Thread
     */
    public class RunnableWorker implements Runnable {

        private final Socket clientSocket;

        public RunnableWorker(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }

        @Override
        public void run() {
            try {
                InputStream input = clientSocket.getInputStream();
                OutputStream output = clientSocket.getOutputStream();

                long time = System.currentTimeMillis();
                byte[] responseDocument = ("<html><body>"
                        + "SingleThreadedServer time: "
                        + time
                        + "</body></html>").getBytes("UTF-8");

                byte[] responseHeader = ("HTTP/1.1 200 OK\r\n"
                        + "Content-Type: text/html; charset=UTF-8\r\n"
                        + "Content-Length: " + responseDocument.length
                        + "\r\n\r\n").getBytes("UTF-8");

                output.write(responseHeader);
                output.write(responseDocument);

                output.close();
                input.close();

                System.out.println("Request processed: " + time);

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private int serverPort;
    private ServerSocket serverSocket;
    private boolean isStopped = false;
    private final ExecutorService threadPool;

    public ThreadPooledServer(int port, int poolSize) {
        this.serverPort = port;
        /**
         * Initial new thread pool with fix size
         */
        this.threadPool = Executors.newFixedThreadPool(poolSize);
    }

    private synchronized boolean isStopped() {
        return this.isStopped;
    }

    public synchronized void stop() {
        this.isStopped = true;
        try {
            this.serverSocket.close();

        } catch (IOException e) {
            throw new RuntimeException("Error on stop server", e);
        }
    }

    private void openServerSocket() {
        try {
            this.serverSocket = new ServerSocket(this.serverPort);
            System.out.println("Server is running on port " + this.serverPort);

        } catch (IOException e) {
            throw new RuntimeException("Cannot open port " + this.serverPort, e);
        }
    }

    @Override
    public void run() {

        openServerSocket();

        while (!isStopped()) {
            Socket clientSocket = null;
            try {
                clientSocket = this.serverSocket.accept();

            } catch (IOException e) {
                if (isStopped()) {
                    System.out.println("Server stopped");
                    return;
                }
                throw new RuntimeException("Error on accept client connection", e);

            }

            /**
             * Push job to threadpool, threadpool will select a free thread
             * to handle this job, if all threads're busy, job will be in queue
             * and wait until exist a free thread
             */
            this.threadPool.execute(
                    new RunnableWorker(clientSocket)
            );
        }

        /**
         * Shutdown threadpool
         */
        this.threadPool.shutdown();

        System.out.println("Server stopped");
    }

    public static void main(String[] args) {

        int poolSize = 10;
        ThreadPooledServer server = new ThreadPooledServer(8080, poolSize);

        /**
         * Run server in single thread
         */
        new Thread(server).start();

        /**
         * Waiting 20s before stopping server
         */
        try {
            Thread.sleep(20 * 1000);

        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }

        /**
         * Stop server and exit processR
         */
        server.stop();
        System.exit(0);
    }
}

Trong ví dụ trên, ta sử dụng một ExecutorService trong Java như một thread pool, khởi tạo đơn giản bằng Executors.newFixedThreadPool(poolSize), giúp tạo một thread pool với kích thước được fix là poolSize

/// ...

    private int serverPort;
    private ServerSocket serverSocket;
    private boolean isStopped = false;
    private final ExecutorService threadPool;

    public ThreadPooledServer(int port, int poolSize) {
        this.serverPort = port;
        /**
         * Initial new thread pool with fix size
         */
        this.threadPool = Executors.newFixedThreadPool(poolSize);
    }

/// ...

Khi nhận được request từ client, ta đưa request này cho thread pool xử lý, tức request được lưu trữ trên queue của thread pool:

/// ...

        while (!isStopped()) {
            Socket clientSocket = null;
            try {
                clientSocket = this.serverSocket.accept();

            } catch (IOException e) {
                if (isStopped()) {
                    System.out.println("Server stopped");
                    return;
                }
                throw new RuntimeException("Error on accept client connection", e);

            }

            /**
             * Push job to threadpool, threadpool will select a free thread
             * to handle this job, if all threads're busy, job will be in queue
             * and wait until exist a free thread
             */
            this.threadPool.execute(
                    new RunnableWorker(clientSocket)
            );
        }

/// ...

Việc tuỳ chỉnh Thread Pool như thế nào là dựa vào tính chất của service mà ta cần xử lý. Trên đây chỉ là một minh hoạ đơn giản. Các bạn có thể tìm hiểu thêm nhiều hơn về Thread pool trong bài viết này.

Tìm hiểu thêm về: