zzh

zzh

Why won't the core threads of the Java thread pool be released?

Lifecycle of Thread Pool#

*
* The runState provides the main lifecycle control, taking on values:
*
*   RUNNING:  Accept new tasks and process queued tasks
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
*   TERMINATED: terminated() has completed
*

Source Code Analysis#

image

When the number of working threads is less than the core thread count, the addWorker function is executed. Next, let's take a look at the addWorker function:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
        }
    }
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

Here we have removed some irrelevant code and only kept the core part. Among them, we can see that within the for(;;) loop, it first checks whether the current number of threads is greater than the core thread count. If it is, it returns false (why do we need to check again here, because in multi-threading, multiple threads may simultaneously submit a task to compete for the last core thread); then in the try part, a new Worker object is created, and the thread in the Worker object is started using the thread in the Worker object (remember the previous execution using the thread in the Worker object), and then we will take a look at the internal implementation of the Worker object:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

First, we can see that the Worker class inherits from AQS, and some internal code uses the state of AQS to perform locking and unlocking operations.

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

The constructor of the Worker class uses the ThreadFactory factory to create a new thread, and passes in the Worker object itself (remember that the thread in the Worker object needs to execute the code passed in here); then after the thread starts, it will execute the run method of the Worker object:

public void run() {
    runWorker(this);
}

Then enter the runWorker method:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    try {
        while (task != null || (task = getTask()) != null) {
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                }finally {
                    afterExecute(task, thrown);
                }
            }finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Here we have also removed most of the code and only kept the core part; we can see that the code uses a while loop to continuously execute the task method (that is, the new Runnable or new Callable submitted by submit), and then set task=null and use task = getTask() to get a new method from the blocking queue to continue execution. This is the key difference between core threads and non-core threads. Next, let's enter the getTask() method:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

From here, we can see that core threads use take() method to get the task, while non-core threads use poll(keepAliveTime, TimeUnit.NANOSECONDS) to get the task. Both will block the current thread, but poll(keepAliveTime, TimeUnit.NANOSECONDS) will have a timeout after a certain time; after exiting, timeOut is set to true, and then return null in the if block above; at this time, you can exit the while loop in runWorker. There is no such problem for core threads.

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.