Multithreaded Server trong Java

Table of contents

Multithreaded server hay server đa luồng là kiểu thiết kế server sử dụng luồng chính để nhận request từ client và sử dụng các worker thread để xử lý các request. Để phân biệt các kiểu server này, các bạn có thể quay về bài viết trước tại đây. Bài viết này sẽ giới thiệu về cách hiện thực một server đa luồng đơn giản trong Java.

Multithreaded server

Server đa luồng là một thiết kế server sử dụng các worker thread để xử lý request từ client, khác với singlethreaded server khi singlethreaded sử dụng chính thread nhận request để handle các connection, khiến server bị blocked trong khoảng thời gian xử lý.

Như trong hình dưới đây, với mỗi client connect đến server, server thực hiện forward connection đó sang một thread mới để xử lý, tránh việc bị block trên thread chính của server.

MultiThreadedServer

Hiện thực một multithreaded server

Việc hiện thực multithreaded server được dựa trên singlethreaded server đã được trình bày trong bài viết trước, điều khác biệt giữa 2 cách hiện thực chính là thay vì thực hiện xử lý request từ client ngay trên thread chính, ta tạo một thread mới để thực hiện handle connection đó, gọi là worker thread. Nhờ sử dụng một thread mới giúp cho server tránh bị block tại thời điểm đó. Tuy nhiên khi lượng connection đến quá nhiều, việc tạo liên tục các thread mới như vậy cũng sẽ có những khuyết điểm nhất định, do đó ta có thêm một cách thiết kế cho multithreaded chính là threadpooled server.

Dưới đây là một hiện thực đơn giản cho multithreaded server, xem mã nguồn ví dụ tại đây.

/*
 * License from https://github.com/wearenodev
 */
package wearenodev.java.examples.thread;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

/**
 *
 * @author harisk
 */
public class MultiThreadedServer implements Runnable {

    /**
     * Worker Thread
     */
    public class RunnableWorker implements Runnable {

        private final Socket clientSocket;

        public RunnableWorker(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }

        @Override
        public void run() {
            try {
                InputStream input = clientSocket.getInputStream();
                OutputStream output = clientSocket.getOutputStream();

                long time = System.currentTimeMillis();
                byte[] responseDocument = ("<html><body>"
                        + "SingleThreadedServer time: "
                        + time
                        + "</body></html>").getBytes("UTF-8");

                byte[] responseHeader = ("HTTP/1.1 200 OK\r\n"
                        + "Content-Type: text/html; charset=UTF-8\r\n"
                        + "Content-Length: " + responseDocument.length
                        + "\r\n\r\n").getBytes("UTF-8");

                output.write(responseHeader);
                output.write(responseDocument);

                output.close();
                input.close();

                System.out.println("Request processed: " + time);

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private int serverPort;
    private ServerSocket serverSocket;
    private boolean isStopped = false;

    public MultiThreadedServer(int port) {
        this.serverPort = port;
    }

    private synchronized boolean isStopped() {
        return this.isStopped;
    }

    public synchronized void stop() {
        this.isStopped = true;
        try {
            this.serverSocket.close();

        } catch (IOException e) {
            throw new RuntimeException("Error on stop server", e);
        }
    }

    private void openServerSocket() {
        try {
            this.serverSocket = new ServerSocket(this.serverPort);
            System.out.println("Server is running on port " + this.serverPort);

        } catch (IOException e) {
            throw new RuntimeException("Cannot open port " + this.serverPort, e);
        }
    }

    @Override
    public void run() {

        openServerSocket();

        while (!isStopped()) {
            Socket clientSocket = null;
            try {
                clientSocket = this.serverSocket.accept();

            } catch (IOException e) {
                if (isStopped()) {
                    System.out.println("Server stopped");
                    return;
                }
                throw new RuntimeException("Error on accept client connection", e);

            }
            new Thread(new RunnableWorker(clientSocket)).start();
        }

        System.out.println("Server stopped");
    }

    public static void main(String[] args) {

        MultiThreadedServer server = new MultiThreadedServer(8080);

        /**
         * Run server in single thread
         */
        new Thread(server).start();

        /**
         * Waiting 20s before stopping server
         */
        try {
            Thread.sleep(20 * 1000);

        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }

        /**
         * Stop server and exit processR
         */
        server.stop();
        System.exit(0);
    }
}

Trong ví dụ trên, ta định nghĩa các worker thread bằng một subclass RunnableWorker hiện thực interface Runnaable như sau:

/// ...

    public class RunnableWorker implements Runnable {

        private final Socket clientSocket;

        public RunnableWorker(Socket clientSocket) {
            this.clientSocket = clientSocket;
        }

        @Override
        public void run() {
            try {
                InputStream input = clientSocket.getInputStream();
                OutputStream output = clientSocket.getOutputStream();

                long time = System.currentTimeMillis();
                byte[] responseDocument = ("<html><body>"
                        + "SingleThreadedServer time: "
                        + time
                        + "</body></html>").getBytes("UTF-8");

                byte[] responseHeader = ("HTTP/1.1 200 OK\r\n"
                        + "Content-Type: text/html; charset=UTF-8\r\n"
                        + "Content-Length: " + responseDocument.length
                        + "\r\n\r\n").getBytes("UTF-8");

                output.write(responseHeader);
                output.write(responseDocument);

                output.close();
                input.close();

                System.out.println("Request processed: " + time);

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

/// ...

Khi thread chính nhận connection từ client, một thread mới sẽ được tạo ra và truyền clientSocket vào cho worker thread xử lý như sau:

/// ...

        while (!isStopped()) {
            Socket clientSocket = null;
            try {
                clientSocket = this.serverSocket.accept();

            } catch (IOException e) {
                if (isStopped()) {
                    System.out.println("Server stopped");
                    return;
                }
                throw new RuntimeException("Error on accept client connection", e);

            }
            new Thread(new RunnableWorker(clientSocket)).start();
        }

/// ...

Đây chỉ là một ví dụ đơn giản hiện thực một multithreaded server. Hiện tại Java đã có rất nhiều framework cho web server, các bạn cũng có thể tham khảo ở bài viết này.

Tìm hiểu thêm về: