zzh

zzh

なぜJavaのスレッドプールのコアスレッドは解放されないのですか?

线程池のライフサイクル#

*
* runStateは、次の値を取ります。
*
*   RUNNING: 新しいタスクを受け入れ、キューにあるタスクを処理します。
*   SHUTDOWN: 新しいタスクを受け入れませんが、キューにあるタスクを処理します。
*   STOP: 新しいタスクを受け入れず、キューにあるタスクを処理せず、進行中のタスクを中断します。
*   TIDYING: すべてのタスクが終了し、workerCountがゼロになり、スレッドが状態TIDYINGに遷移すると、terminated()フックメソッドが実行されます。
*   TERMINATED: terminated()が完了しました。
*

ソースコードの解析#

image

ワーカースレッドがコアスレッド数よりも少ない場合、addWorker 関数が実行されます。次に、addWorker 関数に入ってみましょう:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
        }
    }
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

ここでは、関係のないコードを削除し、コアの部分のみを残しました。ここで、for (;;) ループの内部で現在のスレッド数がコアスレッド数を超えているかどうかを最初に確認し、超えている場合は false を返します(なぜ再度確認する必要があるのかというと、マルチスレッド環境では複数のスレッドが同時に最後のコアスレッドを競合させる可能性があるためです)。その後、try ブロックでは新しい Worker オブジェクトを作成し、その中のスレッドを使用して実行します。それでは、次に Worker オブジェクトの内部実装を見てみましょう:

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

まず、Worker クラスが AQS を継承していることに注意してください。いくつかのコードでは、AQS の state を使用してロックの取得と解放を行います。

Worker(Runnable firstTask) {
    setState(-1); // runWorkerまで割り込みを禁止する
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

Worker クラスのコンストラクタでは、ThreadFactory ファクトリを使用して新しいスレッドを作成し、Worker オブジェクト自体を渡します(前述のように、Worker オブジェクトの中のスレッドを実行するために必要です)。その後、スレッドが開始されると、Worker オブジェクトの run メソッドが実行されます:

public void run() {
    runWorker(this);
}

次に、runWorker メソッドに入ります:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    try {
        while (task != null || (task = getTask()) != null) {
            try {
                beforeExecute(wt, task);
                try {
                    task.run();
                }finally {
                    afterExecute(task, thrown);
                }
            }finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

ここでも、大部分のコードを削除し、核心の部分のみを残しました。コードは while ループを使用して task メソッド(つまり、submit で提出された new Runnable または new Callable)を繰り返し実行し、task = getTask () を使用してブロッキングキューから新しいメソッドを取得して続行します。これがコアスレッドと非コアスレッドの主な違いです。次に、getTask () メソッドに入りましょう:

private Runnable getTask() {
    boolean timedOut = false; // 最後のpoll()がタイムアウトしたかどうか
    for (;;) {
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }
        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

ここからわかるように、コアスレッドは take () メソッドを使用して task を取得し、非コアスレッドは poll (keepAliveTime, TimeUnit.NANOSECONDS) を使用して task を取得します。両方のメソッドは現在のスレッドをブロックしますが、poll (keepAliveTime, TimeUnit.NANOSECONDS) は一定時間後に終了する可能性があります。終了後、timeOut が true に設定され、上記の if ブロックで null を返すことができます。この時点で runWorker の while ループを終了することができます。一方、コアスレッドにはこの問題はありません。

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。