1.深入淺出Java多線程程序設(shè)計(jì)
多線程是這樣一種機(jī)制,它允許在程序中并發(fā)執(zhí)行多個指令流,每個指令流都稱為一個線程,彼此間互相獨(dú)立,
Java線程小結(jié)
。一:理解多線程
多線程是這樣一種機(jī)制,它允許在程序中并發(fā)執(zhí)行多個指令流,每個指令流都稱為一個多線程是這樣一種機(jī)制,它允許在程序中并發(fā)執(zhí)行多個指令流,每個指令流都稱為一個線程又稱為輕量級進(jìn)程,它和進(jìn)程一樣擁有獨(dú)立的執(zhí)行控制,由操作系統(tǒng)負(fù)責(zé)調(diào)度,區(qū)別在于線程沒有獨(dú)立的存儲空間,而是和所屬進(jìn)程中的其它線程共享一個存儲空間,這使得線程間的通信遠(yuǎn)較進(jìn)程簡單。
多個線程的執(zhí)行是并發(fā)的,也就是在邏輯上“同時”,而不管是否是物理上的“同時”。如果系統(tǒng)只有一個CPU,那么真正的“同時”是不可能的,但是由于CPU的速度非?,用戶感覺不到其中的區(qū)別,因此我們也不用關(guān)心它,只需要設(shè)想各個線程是同時執(zhí)行即可。
多線程和傳統(tǒng)的單線程在程序設(shè)計(jì)上最大的區(qū)別在于,由于各個線程的控制流彼此獨(dú)立,使得各個線程之間的代碼是亂序執(zhí)行的,由此帶來的線程調(diào)度,同步等問題,將在以后探討。
二:在Java中實(shí)現(xiàn)多線程
我們不妨設(shè)想,為了創(chuàng)建一個新的線程,我們需要做些什么?很顯然,我們必須指明這個線程所要執(zhí)行的代碼,而這就是在Java中實(shí)現(xiàn)多線程我們所需要做的一切!
真是神奇!Java是如何做到這一點(diǎn)的?通過類!作為一個完全面向?qū)ο蟮恼Z言,Java提供了類 java.lang.Thread 來方便多線程編程,這個類提供了大量的方法來方便我們控制自己的各個線程,我們以后的討論都將圍繞這個類進(jìn)行。
那么如何提供給 Java 我們要線程執(zhí)行的代碼呢?讓我們來看一看 Thread 類。Thread 類最重要的方法是 run() ,它為Thread 類的方法 start() 所調(diào)用,提供我們的線程所要執(zhí)行的代碼。為了指定我們自己的代碼,只需要覆蓋它!
方法一:繼承 Thread 類,覆蓋方法 run(),我們在創(chuàng)建的 Thread 類的子類中重寫 run() ,加入線程所要執(zhí)行的代碼即可。下面是一個例子:
public class MyThread extends Thread {
int count= 1, number;
public MyThread(int num) {
number = num;
System.out.println(創(chuàng)建線程 + number);
}
public void run() {
while(true) {
System.out.println(線程 + number + :計(jì)數(shù) + count);
if(++count== 6) return;
}
}
public static void main(String args[]) {
for(int i = 0; i < 5; i++) new MyThread(i+1).start();
}
}
這種方法簡單明了,符合大家的習(xí)慣,但是,它也有一個很大的缺點(diǎn),那就是如果我們的類已經(jīng)從一個類繼承(如小程序必須繼承自 Applet 類),則無法再繼承 Thread 類,這時如果我們又不想建立一個新的類,應(yīng)該怎么辦呢?
我們不妨來探索一種新的方法:我們不創(chuàng)建 Thread 類的子類,而是直接使用它,那么我們只能將我們的方法作為參數(shù)傳遞給 Thread 類的實(shí)例,有點(diǎn)類似回調(diào)函數(shù)。但是 Java 沒有指針,我們只能傳遞一個包含這個方法的類的實(shí)例。那么如何限制這個類必須包含這一方法呢?當(dāng)然是使用接口。m然抽象類也可滿足,但是需要繼承,而我們之所以要采用這種新方法,不就是為了避免繼承帶來的限制嗎?)
Java 提供了接口 java.lang.Runnable 來支持這種方法。
方法二:實(shí)現(xiàn) Runnable 接口
Runnable 接口只有一個方法 run(),我們聲明自己的類實(shí)現(xiàn) Runnable 接口并提供這一方法,將我們的線程代碼寫入其中,就完成了這一部分的任務(wù)。但是 Runnable 接口并沒有任何對線程的支持,我們還必須創(chuàng)建 Thread 類的實(shí)例,這一點(diǎn)通過 Thread 類的構(gòu)造函數(shù)public Thread(Runnable target);來實(shí)現(xiàn)。下面是一個例子:
public class MyThread implements Runnable {
int count= 1, number;
public MyThread(int num) {
number = num;
System.out.println(創(chuàng)建線程 + number);
}
public void run() {
while(true) {
System.out.println(線程 + number + :計(jì)數(shù) + count);
if(++count== 6) return;
}
}
public static void main(String args[]) {
for(int i = 0; i < 5; i++) new Thread(new MyThread(i+1)).start();
}
}
嚴(yán)格地說,創(chuàng)建 Thread 子類的實(shí)例也是可行的,但是必須注意的是,該子類必須沒有覆蓋 Thread 類的 run 方法,否則該線程執(zhí)行的將是子類的 run 方法,而不是我們用以實(shí)現(xiàn)Runnable 接口的類的 run 方法,對此大家不妨試驗(yàn)一下。
使用 Runnable 接口來實(shí)現(xiàn)多線程使得我們能夠在一個類中包容所有的代碼,有利于封裝,它的缺點(diǎn)在于,我們只能使用一套代碼,若想創(chuàng)建多個線程并使各個線程執(zhí)行不同的代碼,則仍必須額外創(chuàng)建類,如果這樣的話,在大多數(shù)情況下也許還不如直接用多個類分別繼承 Thread 來得緊湊。
綜上所述,兩種方法各有千秋,大家可以靈活運(yùn)用。
下面讓我們一起來研究一下多線程使用中的一些問題。
三:線程的四種狀態(tài)
1. 新狀態(tài):線程已被創(chuàng)建但尚未執(zhí)行(start() 尚未被調(diào)用)。
2. 可執(zhí)行狀態(tài):線程可以執(zhí)行,雖然不一定正在執(zhí)行。CPU 時間隨時可能被分配給該線程,從而使得它執(zhí)行。
3. 死亡狀態(tài):正常情況下 run() 返回使得線程死亡。調(diào)用 stop()或 destroy() 亦有同樣效果,但是不被推薦,前者會產(chǎn)生異常,后者是強(qiáng)制終止,不會釋放鎖。
4. 阻塞狀態(tài):線程不會被分配 CPU 時間,無法執(zhí)行。
四:線程的優(yōu)先級
線程的優(yōu)先級代表該線程的重要程度,當(dāng)有多個線程同時處于可執(zhí)行狀態(tài)并等待獲得 CPU 時間時,線程調(diào)度系統(tǒng)根據(jù)各個線程的優(yōu)先級來決定給誰分配 CPU 時間,優(yōu)先級高的線程有更大的機(jī)會獲得 CPU 時間,優(yōu)先級低的線程也不是沒有機(jī)會,只是機(jī)會要小一些罷了。
你可以調(diào)用 Thread 類的方法 getPriority() 和 setPriority()來存取線程的優(yōu)先級,線程的優(yōu)先級界于1(MIN_PRIORITY)和10(MAX_PRIORITY)之間,缺省是5(NORM_PRIORITY)。
五:線程的同步
由于同一進(jìn)程的多個線程共享同一片存儲空間,在帶來方便的同時,也帶來了訪問沖突這個嚴(yán)重的問題。Java語言提供了專門機(jī)制以解決這種沖突,有效避免了同一個數(shù)據(jù)對象被多個線程同時訪問。
由于我們可以通過 private 關(guān)鍵字來保證數(shù)據(jù)對象只能被方法訪問,所以我們只需針對方法提出一套機(jī)制,這套機(jī)制就是 synchronized 關(guān)鍵字,它包括兩種用法:synchronized 方法和 synchronized 塊。
1. synchronized 方法:通過在方法聲明中加入 synchronized關(guān)鍵字來聲明 synchronized 方法。如:
public synchronized void accessVal(int newVal);
synchronized 方法控制對類成員變量的訪問:每個類實(shí)例對應(yīng)一把鎖,每個 synchronized 方法都必須獲得調(diào)用該方法的類實(shí)例的鎖方能執(zhí)行,否則所屬線程阻塞,方法一旦執(zhí)行,就獨(dú)占該鎖,直到從該方法返回時才將鎖釋放,此后被阻塞的線程方能獲得該鎖,重新進(jìn)入可執(zhí)行狀態(tài)。這種機(jī)制確保了同一時刻對于每一個類實(shí)例,其所有聲明為 synchronized 的成員函數(shù)中至多只有一個處于可執(zhí)行狀態(tài)(因?yàn)橹炼嘀挥幸粋能夠獲得該類實(shí)例對應(yīng)的鎖),從而有效避免了類成員變量的訪問沖突(只要所有可能訪問類成員變量的方法均被聲明為 synchronized)。
在 Java 中,不光是類實(shí)例,每一個類也對應(yīng)一把鎖,這樣我們也可將類的靜態(tài)成員函數(shù)聲明為 synchronized ,以控制其對類的靜態(tài)成員變量的訪問。
2.線程池(java.util.concurrent.ThreadPoolExecutor)的使用
線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構(gòu)造方法為:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue
RejectedExecutionHandler handler)
corePoolSize: 線程池維護(hù)線程的最少數(shù)量
maximumPoolSize:線程池維護(hù)線程的最大數(shù)量
keepAliveTime: 線程池維護(hù)線程所允許的空閑時間
unit: 線程池維護(hù)線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩沖隊(duì)列
handler: 線程池對拒絕任務(wù)的處理策略
一個任務(wù)通過 execute(Runnable)方法被添加到線程池,任務(wù)就是一個 Runnable類型的對象,任務(wù)的執(zhí)行方法就是 Runnable類型對象的run()方法。
當(dāng)一個任務(wù)通過execute(Runnable)方法欲添加到線程池時:
l 如果此時線程池中的數(shù)量小于corePoolSize,即使線程池中的線程都處于空閑狀態(tài),也要創(chuàng)建新的線程來處理被添加的任務(wù)。
l 如果此時線程池中的數(shù)量等于 corePoolSize,但是緩沖隊(duì)列 workQueue未滿,那么任務(wù)被放入緩沖隊(duì)列。
l 如果此時線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量小于maximumPoolSize,建新的線程來處理被添加的任務(wù)。
l 如果此時線程池中的數(shù)量大于corePoolSize,緩沖隊(duì)列workQueue滿,并且線程池中的數(shù)量等于maximumPoolSize,那么通過 handler所指定的策略來處理此任務(wù)。也就是:處理任務(wù)的優(yōu)先級為:核心線程corePoolSize、任務(wù)隊(duì)列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務(wù)。
l 當(dāng)線程池中的線程數(shù)量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態(tài)的調(diào)整池中的線程數(shù)。
unit可選的參數(shù)為java.util.concurrent.TimeUnit中的幾個靜態(tài)屬性:
NANOSECONDS、
MICROSECONDS、
MILLISECONDS、
SECONDS。
workQueue常用的是:java.util.concurrent.ArrayBlockingQueue
handler有四個選擇:
ThreadPoolExecutor.AbortPolicy()
拋出java.util.concurrent.RejectedExecutionException異常
ThreadPoolExecutor.CallerRunsPolicy()
重試添加當(dāng)前的任務(wù),他會自動重復(fù)調(diào)用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
拋棄舊的任務(wù)
ThreadPoolExecutor.DiscardPolicy()
拋棄當(dāng)前的任務(wù)
二、相關(guān)參考
一個 ExecutorService,它使用可能的幾個池線程之一執(zhí)行每個提交的任務(wù),通常使用 Executors 工廠方法配置,
電腦資料
《Java線程小結(jié)》(http://www.shangyepx.com)。線程池可以解決兩個不同問題:由于減少了每個任務(wù)調(diào)用的開銷,它們通?梢栽趫(zhí)行大量異步任務(wù)時提供增強(qiáng)的性能,并且還可以提供綁定和管理資源 (包括執(zhí)行集合任務(wù)時使用的線程)的方法。每個 ThreadPoolExecutor 還維護(hù)著一些基本的統(tǒng)計(jì)數(shù)據(jù),如完成的任務(wù)數(shù)。
為了便于跨大量上下文使用,此類提供了很多可調(diào)整的參數(shù)和擴(kuò)展掛鉤。但是,強(qiáng)烈建議程序員使用較為方便的 Executors 工廠方法 Executors.newCachedThreadPool()(無界線程池,可以進(jìn)行自動線程回收)、 Executors.newFixedThreadPool(int)(固定大小線程池)和 Executors.newSingleThreadExecutor()(單個后臺線程),它們均為大多數(shù)使用場景預(yù)定義了設(shè)置。否則,在手動配置和調(diào) 整此類時,使用以下指導(dǎo):
核心和最大池大小
ThreadPoolExecutor 將根據(jù) corePoolSize(參見 getCorePoolSize())和 maximumPoolSize(參見 getMaximumPoolSize())設(shè)置的邊界自動調(diào)整池大小。當(dāng)新任務(wù)在方法 execute(java.lang.Runnable) 中提交時,如果運(yùn)行的線程少于 corePoolSize,則創(chuàng)建新線程來處理請求,即使其他輔助線程是空閑的。如果運(yùn)行的線程多于 corePoolSize 而少于 maximumPoolSize,則僅當(dāng)隊(duì)列滿時才創(chuàng)建新線程。如果設(shè)置的 corePoolSize 和 maximumPoolSize 相同,則創(chuàng)建了固定大小的線程池。如果將 maximumPoolSize 設(shè)置為基本的無界值(如 Integer.MAX_VALUE),則允許池適應(yīng)任意數(shù)量的并發(fā)任務(wù)。在大多數(shù)情況下,核心和最大池大小僅基于構(gòu)造來設(shè)置,不過也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進(jìn)行動態(tài)更改。
按需構(gòu)造
默認(rèn)情況下,即使核心線程最初只是在新任務(wù)需要時才創(chuàng)建和啟動的,也可以使用方法 prestartCoreThread() 或 prestartAllCoreThreads() 對其進(jìn)行動態(tài)重寫。
創(chuàng)建新線程
使用 ThreadFactory 創(chuàng)建新線程。如果沒有另外說明,則在同一個 ThreadGroup 中一律使用 Executors.defaultThreadFactory() 創(chuàng)建線程,并且這些線程具有相同的 NORM_PRIORITY 優(yōu)先級和非守護(hù)進(jìn)程狀態(tài)。通過提供不同的 ThreadFactory,可以改變線程的名稱、線程組、優(yōu)先級、守護(hù)進(jìn)程狀態(tài),等等。如果從 newThread 返回 null 時 ThreadFactory 未能創(chuàng)建線程,則執(zhí)行程序?qū)⒗^續(xù)運(yùn)行,但不能執(zhí)行任何任務(wù)。
保持活動時間
如果池中當(dāng)前有多于 corePoolSize 的線程,則這些多出的線程在空閑時間超過 keepAliveTime 時將會終止(參見 getKeepAliveTime(java.util.concurrent.TimeUnit))。這提供了當(dāng)池處于非活動狀態(tài)時減少資源消耗的方 法。如果池后來變得更為活動,則可以創(chuàng)建新的線程。也可以使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態(tài)地更改此參數(shù)。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 的值在關(guān)閉前有效地從以前的終止?fàn)顟B(tài)禁用空閑線程。
排隊(duì)
所有 BlockingQueue 都可用于傳輸和保持提交的任務(wù)。可以使用此隊(duì)列與池大小進(jìn)行交互:
A. 如果運(yùn)行的線程少于 corePoolSize,則 Executor 始終首選添加新的線程,而不進(jìn)行排隊(duì)。
B. 如果運(yùn)行的線程等于或多于 corePoolSize,則 Executor 始終首選將請求加入隊(duì)列,而不添加新的線程。
C. 如果無法將請求加入隊(duì)列,則創(chuàng)建新的線程,除非創(chuàng)建此線程超出 maximumPoolSize,在這種情況下,任務(wù)將被拒絕。
排隊(duì)有三種通用策略:
直接提交。工作隊(duì)列的默認(rèn)選項(xiàng)是 SynchronousQueue,它將任務(wù)直接提交給線程而不保持它們。在此,如果不存在可用于立即運(yùn)行任務(wù)的線程,則試圖把任務(wù)加入隊(duì)列將失敗,因此 會構(gòu)造一個新的線程。此策略可以避免在處理可能具有內(nèi)部依賴性的請求集合時出現(xiàn)鎖定。直接提交通常要求無界 maximumPoolSizes 以避免拒絕新提交的任務(wù)。當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時,此策略允許無界線程具有增長的可能性。
無界隊(duì)列。使用無界隊(duì)列(例如,不具有預(yù)定義容量的 LinkedBlockingQueue)將導(dǎo)致在所有 corePoolSize 線程都忙的情況下將新任務(wù)加入隊(duì)列。這樣,創(chuàng)建的線程就不會超過 corePoolSize。(因此,maximumPoolSize 的值也就無效了。)當(dāng)每個任務(wù)完全獨(dú)立于其他任務(wù),即任務(wù)執(zhí)行互不影響時,適合于使用無界隊(duì)列;例如,在 Web 頁服務(wù)器中。這種排隊(duì)可用于處理瞬態(tài)突發(fā)請求,當(dāng)命令以超過隊(duì)列所能處理的平均數(shù)連續(xù)到達(dá)時,此策略允許無界線程具有增長的可能性。
有界隊(duì)列。當(dāng)使用有限的 maximumPoolSizes 時,有界隊(duì)列(如 ArrayBlockingQueue)有助于防止資源耗盡,但是可能較難調(diào)整和控制。隊(duì)列大小和最大池大小可能需要相互折衷:使用大型隊(duì)列和小型池可以 最大限度地降低 CPU 使用率、操作系統(tǒng)資源和上下文切換開銷,但是可能導(dǎo)致人工降低吞吐量。如果任務(wù)頻繁阻塞(例如,如果它們是 I/O 邊界),則系統(tǒng)可能為超過您許可的更多線程安排時間。使用小型隊(duì)列通常要求較大的池大小,CPU 使用率較高,但是可能遇到不可接受的調(diào)度開銷,這樣也會降低吞吐量。
被拒絕的任務(wù)
當(dāng) Executor 已經(jīng)關(guān)閉,并且 Executor 將有限邊界用于最大線程和工作隊(duì)列容量,且已經(jīng)飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務(wù)將被拒絕。在以上兩種情況下,execute 方法都將調(diào)用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預(yù)定義的處理程序策略:
A. 在默認(rèn)的 ThreadPoolExecutor.AbortPolicy 中,處理程序遭到拒絕將拋出運(yùn)行時 RejectedExecutionException。
B. 在 ThreadPoolExecutor.CallerRunsPolicy 中,線程調(diào)用運(yùn)行該任務(wù)的 execute 本身。此策略提供簡單的反饋控制機(jī)制,能夠減緩新任務(wù)的提交速度。
C. 在 ThreadPoolExecutor.DiscardPolicy 中,不能執(zhí)行的任務(wù)將被刪除。
D. 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果執(zhí)行程序尚未關(guān)閉,則位于工作隊(duì)列頭部的任務(wù)將被刪除,然后重試執(zhí)行程序(如果再次失敗,則重復(fù)此過程)。
定義和使用其他種類的 RejectedExecutionHandler 類也是可能的,但這樣做需要非常小心,尤其是當(dāng)策略僅用于特定容量或排隊(duì)策略時。
掛鉤方法
此類提供 protected 可重寫的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,這兩種方法分別在執(zhí)行每個任務(wù)之前和之后調(diào)用。它們可用于操縱執(zhí)行環(huán)境;例如,重新初始化 ThreadLocal、搜集統(tǒng)計(jì)信息或添加日志條目。此外,還可以重寫方法 terminated() 來執(zhí)行 Executor 完全終止后需要完成的所有特殊處理。
如果掛鉤或回調(diào)方法拋出異常,則內(nèi)部輔助線程將依次失敗并突然終止。
隊(duì)列維護(hù)
方法 getQueue() 允許出于監(jiān)控和調(diào)試目的而訪問工作隊(duì)列。強(qiáng)烈反對出于其他任何目的而使用此方法。remove(java.lang.Runnable) 和 purge() 這兩種方法可用于在取消大量已排隊(duì)任務(wù)時幫助進(jìn)行存儲回收。
一、例子
創(chuàng)建 TestThreadPool 類:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class TestThreadPool { private static int produceTaskSleepTime = 2; private static int produceTaskMaxNumber = 10; public static void main(String[] args) { // 構(gòu)造一個線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue
創(chuàng)建 ThreadPoolTask類:
import java.io.Serializable; public class ThreadPoolTask implements Runnable, Serializable { private Object attachData; ThreadPoolTask(Object tasks) { this.attachData = tasks; } public void run() { System.out.println(開始執(zhí)行任務(wù): + attachData); attachData = null; } public Object getTask() { return this.attachData; } }
執(zhí)行結(jié)果:
創(chuàng)建任務(wù)并提交到線程池中:task@ 1
開始執(zhí)行任務(wù):task@ 1
創(chuàng)建任務(wù)并提交到線程池中:task@ 2
開始執(zhí)行任務(wù):task@ 2
創(chuàng)建任務(wù)并提交到線程池中:task@ 3
創(chuàng)建任務(wù)并提交到線程池中:task@ 4
開始執(zhí)行任務(wù):task@ 3
創(chuàng)建任務(wù)并提交到線程池中:task@ 5
開始執(zhí)行任務(wù):task@ 4
創(chuàng)建任務(wù)并提交到線程池中:task@ 6
創(chuàng)建任務(wù)并提交到線程池中:task@ 7
創(chuàng)建任務(wù)并提交到線程池中:task@ 8
開始執(zhí)行任務(wù):task@ 5
開始執(zhí)行任務(wù):task@ 6
創(chuàng)建任務(wù)并提交到線程池中:task@ 9
開始執(zhí)行任務(wù):task@ 7
創(chuàng)建任務(wù)并提交到線程池中:task@ 10
開始執(zhí)行任務(wù):task@ 8
開始執(zhí)行任務(wù):task@ 9
開始執(zhí)行任務(wù):task@ 10