1405. Problem - Custom Thread Pool
Thread Pool and BlockingQueue


Implement a thread pool with blocking queue.

1. Requirement

Implement a thread pool with blocking queue. Use BlockingQueue to store the tasks from client. The Worker will take one task from BlockingQueue and run it.

2. Solution

Customized ThreadPool.

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ThreadPool {
    private BlockingQueue bq ;
    private List<Worker> threads;
    private boolean isStopped;
    private int capacity;

    public ThreadPool(int numOfThreads) {
        isStopped = false;
        capacity = 100;
        bq = new ArrayBlockingQueue(capacity);
        threads = new ArrayList<>();
        for (int i = 0; i < numOfThreads; i++) {
            threads.add(new Worker(i + 1, bq));
        }
        for (Worker worker : threads) {
            worker.start();
        }
        System.out.println("Thread pool is ready...");
    }

    public synchronized void execute(Runnable task) throws Exception {
        if (this.isStopped) {
            throw new IllegalStateException("ThreadPool is stopped");
        }

        this.bq.put(task);
    }

    public synchronized void shutdown() {
        try {
            Thread.sleep(20000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.isStopped = true;
        for (Worker thread : threads) {
            thread.doStop();
        }
    }
}

Worker extends the Thread class. Call BlockingQueue.take() method to get task and execute it.

class Worker extends Thread {
    private int id;
    private BlockingQueue<Task> bq;
    private boolean isStopped;

    public Worker(int id, BlockingQueue bq) {
        this.id = id;
        this.bq = bq;
        this.isStopped = false;
    }

    public void run(){
        while(!isStopped()){
            try {
                Task task = bq.take();
                System.out.println("Worker #" + id + " is working on the task: " + task.getName());
                task.run();
            } catch(Exception e){
                //log or otherwise report exception,
                //but keep worker thread alive.
            }
        }
    }

    public synchronized void doStop(){
        if (!this.isInterrupted()) {
            try {
                this.interrupt(); //break worker thread
            } catch (SecurityException ignore) {
            } finally {
            }
        }
        isStopped = true;
    }

    public synchronized boolean isStopped(){
        return isStopped;
    }
}

Real task needs to be executed.

public class Task implements Runnable {
    private String name;
    private Random random;

    public Task(String name) {
        this.name = name;
        this.random = new Random();
    }

    public String getName() {
        return name;
    }

    public void run() {
        try {
            Long duration = (long) (random.nextInt(10));
            System.out.println("Executing : " + name + " at " + LocalDateTime.now().toString());
            TimeUnit.SECONDS.sleep(duration);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Create a thread pool with 2 workers and create 5 tasks to be executed by the thread pool.

public class ThreadPoolExample {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2);

        List<Task> list = new ArrayList<>();
        for (int i = 1; i <= 5; i++) {
            Task task = new Task("Task" + i);
            list.add(task);
            System.out.println("Created : " + task.getName());
        }

        for (Task task : list) {
            try {
                threadPool.execute(task);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        threadPool.shutdown();
    }
}

Output.

Thread pool is ready...
Created : Task1
Created : Task2
Created : Task3
Created : Task4
Created : Task5
Worker #2 is working on the task: Task2
Worker #1 is working on the task: Task1
Executing : Task2 at 2020-04-17T21:24:56.042
Executing : Task1 at 2020-04-17T21:24:56.042
Worker #1 is working on the task: Task3
Executing : Task3 at 2020-04-17T21:24:56.099
Worker #2 is working on the task: Task4
Executing : Task4 at 2020-04-17T21:25:04.100
Worker #1 is working on the task: Task5
Executing : Task5 at 2020-04-17T21:25:05.102

3. Source Files

4. References