鄭州java培訓
鄭州文化路中心

400-8765-661

鄭州java培訓:Java 線程池框架核心代碼

  • 時間:2017-03-31 15:50
  • 發布:鄭州Java培訓機構
  • 來源:達內課程

鄭州java培訓機構的老師知道在多線程編程中,為每個任務分配一個線程是不現實的,線程創建的開銷和資源消耗都是很高的。線程池應運而生,成為我們管理線程的利器。Java 通過Executor接口,提供了一種標準的方法將任務的提交過程和執行過程解耦開來,并用Runnable表示任務。

下面,鄭州java培訓機構的老師來給大家分析一下 Java 線程池框架的實現ThreadPoolExecutor。

下面的分析基于JDK1.7

生命周期

ThreadPoolExecutor中,使用CAPACITY的高3位來表示運行狀態,分別是:

RUNNING:接收新任務,并且處理任務隊列中的任務

SHUTDOWN:不接收新任務,但是處理任務隊列的任務

STOP:不接收新任務,不出來任務隊列,同時中斷所有進行中的任務

TIDYING:所有任務已經被終止,工作線程數量為 0,到達該狀態會執行terminated()

TERMINATED:terminated()執行完畢

狀態轉換圖

ThreadPoolExecutor中用原子類來表示狀態位

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

線程池模型

核心參數

corePoolSize:最小存活的工作線程數量(如果設置allowCoreThreadTimeOut,那么該值為 0)

maximumPoolSize:最大的線程數量,受限于CAPACITY

keepAliveTime:對應線程的存活時間,時間單位由TimeUnit指定

workQueue:工作隊列,存儲待執行的任務

RejectExecutionHandler:拒絕策略,線程池滿后會觸發

線程池的最大容量:CAPACITY中的前三位用作標志位,也就是說工作線程的最大容量為(2^29)-1

四種模型

CachedThreadPool:一個可緩存的線程池,如果線程池的當前規模超過了處理需求時,那么將回收空閑的線程,當需求增加時,則可以添加新的線程,線程池的規模不存在任何的限制。

FixedThreadPool:一個固定大小的線程池,提交一個任務時就創建一個線程,直到達到線程池的最大數量,這時線程池的大小將不再變化。

SingleThreadPool:一個單線程的線程池,它只有一個工作線程來執行任務,可以確保按照任務在隊列中的順序來串行執行,如果這個線程異常結束將創建一個新的線程來執行任務。

ScheduledThreadPool:一個固定大小的線程池,并且以延遲或者定時的方式來執行任務,類似于Timer。

執行任務 execute

核心邏輯:

當前線程數量 corePoolSize,直接開啟新的核心線程執行任務addWorker(command, true)

當前線程數量 >= corePoolSize,且任務加入工作隊列成功

檢查線程池當前狀態是否處于RUNNING

如果否,則拒絕該任務

如果是,判斷當前線程數量是否為 0,如果為 0,就增加一個工作線程。

開啟普通線程執行任務addWorker(command, false),開啟失敗就拒絕該任務

從上面的分析可以總結出線程池運行的四個階段:

poolSize 且隊列為空,此時會新建線程來處理提交的任務

poolSize == corePoolSize,此時提交的任務進入工作隊列,工作線程從隊列中獲取任務執行,此時隊列不為空且未滿。

poolSize == corePoolSize,并且隊列已滿,此時也會新建線程來處理提交的任務,但是poolSize

poolSize == maxPoolSize,并且隊列已滿,此時會觸發拒絕策略

拒絕策略

前面我們提到任務無法執行會被拒絕,RejectedExecutionHandler是處理被拒絕任務的接口。下面是四種拒絕策略。

AbortPolicy:默認策略,終止任務,拋出RejectedException

CallerRunsPolicy:在調用者線程執行當前任務,不拋異常

DiscardPolicy: 拋棄策略,直接丟棄任務,不拋異常

DiscardOldersPolicy:拋棄最老的任務,執行當前任務,不拋異常

線程池中的 Worker

Worker繼承了AbstractQueuedSynchronizer和Runnable,前者給Worker提供鎖的功能,后者執行工作線程的主要方法runWorker(Worker w)(從任務隊列撈任務執行)。Worker 引用存在workers集合里面,用mainLock守護。

private final ReentrantLock mainLock = new ReentrantLock();

private final HashSet workers = new HashSet();

核心函數 runWorker

下面是簡化的邏輯,注意:每個工作線程的run都執行下面的函數

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

Runnable task = w.firstTask;

w.firstTask = null;

while (task != null || (task = getTask()) != null) {

w.lock();

beforeExecute(wt, task);

task.run();

afterExecute(task, thrown);

w.unlock();

}

processWorkerExit(w, completedAbruptly);

}

從getTask()中獲取任務

鎖住 worker

執行beforeExecute(wt, task),這是ThreadPoolExecutor提供給子類的擴展方法

運行任務,如果該worker有配置了首次任務,則先執行首次任務且只執行一次。

執行afterExecute(task, thrown);

解鎖 worker

如果獲取到的任務為 null,關閉 worker

獲取任務 getTask

線程池內部的任務隊列是一個阻塞隊列,具體實現在構造時傳入。

private final BlockingQueue workQueue;

getTask()從任務隊列中獲取任務,支持阻塞和超時等待任務,四種情況會導致返回null,讓worker關閉。

現有的線程數量超過最大線程數量

線程池處于STOP狀態

線程池處于SHUTDOWN狀態且工作隊列為空

線程等待任務超時,且線程數量超過保留線程數量

核心邏輯:根據timed在阻塞隊列上超時等待或者阻塞等待任務,等待任務超時會導致工作線程被關閉。

timed = allowCoreThreadTimeOut || wc > corePoolSize;

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

在以下兩種情況下等待任務會超時:

允許核心線程等待超時,即allowCoreThreadTimeOut(true)

當前線程是普通線程,此時wc > corePoolSize

工作隊列使用的是BlockingQueue,這里就不展開了,后面再寫一篇詳細的分析。

總結

ThreadPoolExecutor基于生產者-消費者模式,提交任務的操作相當于生產者,執行任務的線程相當于消費者。

Executors提供了四種基于ThreadPoolExecutor構造線程池模型的方法,除此之外,我們還可以直接繼承ThreadPoolExecutor,重寫beforeExecute和afterExecute方法來定制線程池任務執行過程。

使用有界隊列還是無界隊列需要根據具體情況考慮,工作隊列的大小和線程的數量也是需要好好考慮的。

拒絕策略推薦使用CallerRunsPolicy,該策略不會拋棄任務,也不會拋出異常,而是將任務回退到調用者線程中執行。

講到這里,Java 線程池框架核心代碼昆明java培訓機構的老師已經分析完啦,現在大家都懂了吧,記得關注我們鄭州達內java培訓機構。

上一篇:鄭州java培訓:基于Java開發-關于類設計技巧的四點建議
下一篇: 鄭州java培訓:使用JedisPool操作Redis

【java學習系列】java靜態內部類

Java學習路線圖,其五個必經階段

Java基礎與面向對象

鄭州java培訓專家:java學習——語句

選擇城市和中心
江西省

貴州省

廣西省

海南省

色先锋玖玖AV资源部