|
以下代碼的實(shí)現(xiàn)邏輯出自于公眾號(hào) 碼農(nóng)翻身 《你管這破玩意叫線程池?》 - PS:劉欣老師在我心中是軟件技術(shù)行業(yè)的大劉。 線程池接口
public interface Executor { public void execute(Runnable r); } 接口中只有一個(gè)抽象方法,execute(Runnable r);它接收一個(gè)Runnable,無返回值實(shí)現(xiàn)它的子類只需要將傳入的Runnable執(zhí)行即可。 NewsThreadExecutor
package com.datang.bingxiang.run; import com.datang.bingxiang.run.intr.Executor; public class NewsThreadExecutor implements Executor { //每次調(diào)用都創(chuàng)建一個(gè)新的線程 @Override public void execute(Runnable r) { new Thread(r).start(); } } 這個(gè)實(shí)現(xiàn)類最簡(jiǎn)單也最明白,真的每次調(diào)用我們都創(chuàng)建一個(gè)Thread將參數(shù)Runnable執(zhí)行。這么做的弊端就是每個(gè)調(diào)用者發(fā)布一個(gè)任務(wù)都需要?jiǎng)?chuàng)建一個(gè)新的線程,線程使用后就被銷毀了,對(duì)內(nèi)存造成了很大的浪費(fèi)。 SingThreadExecutor
package com.datang.bingxiang.run; import java.util.concurrent.ArrayBlockingQueue; import com.datang.bingxiang.run.intr.Executor; //只有一個(gè)線程,在實(shí)例化后就啟動(dòng)線程。用戶調(diào)用execute()傳遞的Runnable會(huì)添加到隊(duì)列中。 //隊(duì)列有一個(gè)固定的容量3,如果隊(duì)列滿則拋棄任務(wù)。 //線程的run方法不停的循環(huán),從隊(duì)列里取Runnable然后執(zhí)行其run()方法。 public class SingThreadExecutor implements Executor { // ArrayBlockingQueue 數(shù)組類型的有界隊(duì)列 // LinkedBlockingDeque 鏈表類型的有界雙端隊(duì)列 // LinkedBlockingQueue 鏈表類型的有界單向隊(duì)列 private ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1); //線程不停的從隊(duì)列獲取任務(wù) private Thread worker = new Thread(() -> { while (true) { try { //take會(huì)在獲取不到任務(wù)時(shí)阻塞。并且也有Lock鎖 Runnable r = queue.take(); r.run(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); // 構(gòu)造函數(shù)啟動(dòng)線程 public SingThreadExecutor() { worker.start(); } @Override public void execute(Runnable r) { // 這個(gè)offer和add不同的是offer有Lock鎖,如果隊(duì)列滿則返回false。 // add則是隊(duì)列滿拋出異常,并且沒有Lock鎖。 if (!queue.offer(r)) { System.out.println("線程等待隊(duì)列已滿,不可加入。本次任務(wù)丟棄!"); } } } 改變下思路,這次線程池實(shí)現(xiàn)類只創(chuàng)建一個(gè)線程,調(diào)用者發(fā)布的任務(wù)都存放到一個(gè)隊(duì)列中(隊(duì)列符合先進(jìn)先出的需求)但是注意我們?cè)O(shè)計(jì)線程池一定要選擇有界隊(duì)列,因?yàn)槲覀儾荒軣o限制的往隊(duì)列中添加任務(wù)。在隊(duì)列滿后,在進(jìn)來的任務(wù)就要被拒絕掉。ArrayBlockingQueue 是一個(gè)底層有數(shù)組實(shí)現(xiàn)的有界阻塞隊(duì)列,實(shí)例化一個(gè)ArrayBlockingQueue傳遞參數(shù)為1,表示隊(duì)列長(zhǎng)度最大為1.唯一的一個(gè)工作線程也是成員變量,線程執(zhí)行后不斷的自旋從隊(duì)列中獲取任務(wù),take()方法將隊(duì)列頭的元素出隊(duì),若隊(duì)列為空則阻塞,這個(gè)方法是線程安全的。 execute(r)方法接收到任務(wù)后,將任務(wù)添加到隊(duì)列中,offer()方法將元素添加到隊(duì)列若隊(duì)列已滿則返回false。execute(r)則直接拒絕掉本次任務(wù)。
CorePollThreadExecutorSingThreadExecutor線程池的缺點(diǎn)是只有一個(gè)工作線程,這樣顯然是不夠靈活,CorePollThreadExecutor中增加了corePollSize核心線程數(shù)參數(shù),由用戶規(guī)定有需要幾個(gè)工作線程。這次我們選用的隊(duì)列為L(zhǎng)inkedBlockingQueue這是一個(gè)數(shù)據(jù)結(jié)構(gòu)為鏈表的有界阻塞單向隊(duì)列。 initThread()方法根據(jù)corePollSize循環(huán)創(chuàng)建N個(gè)線程,線程創(chuàng)建后同樣調(diào)用take()方法從阻塞隊(duì)列中獲取元素,若獲取成功則執(zhí)行Runnable的run()方法,若獲取隊(duì)列中沒有元素則阻塞。execute(r)則還是負(fù)責(zé)將任務(wù)添加到隊(duì)列中。 CountCorePollThreadExecutorCorePollThreadExecutor中有三個(gè)問題 1 當(dāng)隊(duì)列滿時(shí)線程池直接拒絕了任務(wù),這應(yīng)該讓用戶決定被拒絕的任務(wù)如何處理。 2 線程的創(chuàng)建策略也應(yīng)該交給用戶做處理。 3 初始化后就創(chuàng)建了N個(gè)核心線程數(shù),但是這些線程可能會(huì)用不到而造成浪費(fèi)。 RejectedExecutionHandler接口的實(shí)現(xiàn)應(yīng)該讓用戶決定如何處理隊(duì)列滿的異常情況。
package com.datang.bingxiang.run.intr; public interface RejectedExecutionHandler { public void rejectedExecution(); }
package com.datang.bingxiang.run; import com.datang.bingxiang.run.intr.RejectedExecutionHandler; public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution() { System.out.println("隊(duì)列已經(jīng)滿了?。。。‘?dāng)前task被拒絕"); } } ThreadFactory接口的實(shí)現(xiàn)應(yīng)該讓用戶決定創(chuàng)建線程的方法。
package com.datang.bingxiang.run.intr; public interface ThreadFactory { public Thread newThread(Runnable r); }
package com.datang.bingxiang.run; import com.datang.bingxiang.run.intr.ThreadFactory; public class CustomThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { System.out.println("創(chuàng)建了新的核心線程"); return new Thread(r); } } CountCorePollThreadExecutor的構(gòu)造函數(shù)接收三個(gè)參數(shù)corePollSize,rejectedExecutionHandler,threadFactory。因?yàn)楝F(xiàn)在我們需要按需創(chuàng)建核心線程,所以需要一個(gè)變量workCount記錄當(dāng)前已經(jīng)創(chuàng)建的工作線程,為了保證線程之間拿到的workCount是最新的(可見性),我們需要給變量workCount加上volatile修飾,保證改變了的修改能被所有線程看到。execute(r)首先要調(diào)用initThread(r)判斷是否有線程被創(chuàng)建,如果沒有線程創(chuàng)建則表示工作線程數(shù)已經(jīng)和核心線程數(shù)相同了,此時(shí)需要將新的任務(wù)添加到隊(duì)列中,如果隊(duì)列滿,則執(zhí)行傳入的拒絕策略。重要的方法在于initThread(r)。initThread(r)方法返回true表示有工作線程被創(chuàng)建任務(wù)將被工作線程直接執(zhí)行,無需入隊(duì)列。返回false則將任務(wù)入隊(duì),隊(duì)列滿則執(zhí)行拒絕策略。 fill變量表示核心線程數(shù)是否全部創(chuàng)建,為了保證多線程的環(huán)境下不會(huì)創(chuàng)建多于corePoolSize個(gè)數(shù)的線程,所以需要使用同步鎖,initThread(r)都要使用鎖則會(huì)降低效率,尤其是當(dāng)工作線程數(shù)已經(jīng)到達(dá)核心線程數(shù)后,所以這一塊代碼使用到了雙重判斷,當(dāng)加鎖后在此判斷工作線程是否已滿。如果已滿返回false。接下來使用threadFactory工廠創(chuàng)建線程,在線程中使用代碼塊,保證當(dāng)前任務(wù)可以被新創(chuàng)建的工作線程執(zhí)行。新的工作線程依然是從隊(duì)列中獲取任務(wù)并執(zhí)行。線程開啟后工作線程++,如果工作線程數(shù)等于核心線程數(shù)則改變fill標(biāo)記。返回true,成功創(chuàng)建線程,不要忘記在finally中釋放鎖。
package com.datang.bingxiang.run; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import com.datang.bingxiang.run.intr.Executor; import com.datang.bingxiang.run.intr.RejectedExecutionHandler; import com.datang.bingxiang.run.intr.ThreadFactory; public class CountCorePollThreadExecutor implements Executor { // 核心線程數(shù) private Integer corePollSize; // 工作線程數(shù),也就是線程實(shí)例的數(shù)量 private volatile Integer workCount = 0; // 線程是否已滿 private volatile boolean fill = false; // 拒絕策略,由調(diào)用者傳入,當(dāng)隊(duì)列滿時(shí),執(zhí)行自定義策略 private RejectedExecutionHandler rejectedExecutionHandler; // 線程工廠,由調(diào)用者傳入 private ThreadFactory threadFactory; public CountCorePollThreadExecutor(Integer corePollSize, RejectedExecutionHandler rejectedExecutionHandler, ThreadFactory threadFactory) { this.corePollSize = corePollSize; this.rejectedExecutionHandler = rejectedExecutionHandler; this.threadFactory = threadFactory; } // 這次使用鏈表類型的單向隊(duì)列 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1); @Override public void execute(Runnable r) { // 如果沒有創(chuàng)建線程 if (!initThread(r)) { // offer和ArrayBlockingQueue的offer相同的作用 if (!queue.offer(r)) { rejectedExecutionHandler.rejectedExecution(); } } } // 同步鎖,因?yàn)榕袛嗪诵木€程數(shù)和工作線程數(shù)的操作需要線程安全 Lock lock = new ReentrantLock(); public boolean initThread(Runnable r) { // 如果工作線程沒有創(chuàng)建滿則需要?jiǎng)?chuàng)建。 if (!fill) { try { lock.lock();// 把鎖 加在判斷里邊是為了不讓每次initThread方法執(zhí)行時(shí)都加鎖 // 此處進(jìn)行雙重判斷,因?yàn)榭赡芤驗(yàn)槎嗑€程原因多個(gè)線程都判斷工作線程沒有創(chuàng)建滿,但是不要緊 // 只有一個(gè)線程可以進(jìn)來,如果后續(xù)線程二次判斷已經(jīng)滿了就直接返回。 if (fill) { return false; } Thread newThread = threadFactory.newThread(() -> { // 因?yàn)榫€程是由任務(wù)觸發(fā)創(chuàng)建的,所以先把觸發(fā)線程創(chuàng)建的任務(wù)執(zhí)行掉。 { r.run(); } while (true) { // 然后該線程則不停的從隊(duì)列中獲取任務(wù) try { Runnable task = queue.take(); task.run(); } catch (InterruptedException e) { e.printStackTrace(); } } }); newThread.start(); // 工作線程數(shù)+1 workCount++; // 如果工作線程數(shù)已經(jīng)與核心線程數(shù)相等,則不可創(chuàng)建 if (workCount == corePollSize) { fill = true; } return true; } finally { lock.unlock();// 釋放鎖 } } else { // 工作線程已滿則不創(chuàng)建 return false; } } }
ThreadPoolExecutor最后考慮下,當(dāng)工作線程數(shù)到達(dá)核心線程數(shù)后,隊(duì)列也滿了以后,任務(wù)就被拒絕了。能不能想個(gè)辦法,當(dāng)工作線程滿后,多增加幾個(gè)線程工作,當(dāng)任務(wù)不多時(shí)在將擴(kuò)展的線程銷毀。ThreadPoolExecutor的構(gòu)造函數(shù)中新增三個(gè)參數(shù)maximumPoolSize最大線程數(shù)keepAliveTime空閑時(shí)間,unit空閑時(shí)間的單位。 和CountCorePollThreadExecutor相比較在流程上講我們只需要在隊(duì)列滿時(shí)判斷工作線程是否和最大線程數(shù)相等,如果不相等則創(chuàng)建備用線程,并且在備用線程長(zhǎng)時(shí)間不工作時(shí)需要銷毀掉工作線程。create()方法雙重判斷workCount==maximumPoolSize如果已經(jīng)相等表示已經(jīng)不能創(chuàng)建線程了,此時(shí)只能執(zhí)行拒絕策略。否則創(chuàng)建備用線程,備用線程創(chuàng)建后自旋的執(zhí)行poll(l,u)方法,該方法也是取出隊(duì)列頭元素,和take()不同的是,poll如果一段時(shí)間后仍然從隊(duì)列中拿不到元素(隊(duì)列為空)則返回null,此時(shí)我們需要將該備用線程銷毀。在創(chuàng)建線程后將workCount++。此外需要注意,因?yàn)楫?dāng)前隊(duì)列滿了,所以才會(huì)創(chuàng)建備用線程所以不要將當(dāng)前的任務(wù)給忘了,LinkedBlockingQueue的put(r)方法會(huì)阻塞的添加元素,直到添加成功。最后 stop()判讀如果workCount>corePollSize則在線程安全的環(huán)境下將線程停止,并且將workCount--。
package com.datang.bingxiang.run; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import com.datang.bingxiang.run.intr.Executor; import com.datang.bingxiang.run.intr.RejectedExecutionHandler; import com.datang.bingxiang.run.intr.ThreadFactory; public class ThreadPoolExecutor implements Executor { // 核心線程數(shù) private Integer corePollSize; // 工作線程數(shù),也就是線程實(shí)例的數(shù)量 private Integer workCount = 0; // 當(dāng)隊(duì)列滿時(shí),需要?jiǎng)?chuàng)建新的Thread,maximumPoolSize為最大線程數(shù) private Integer maximumPoolSize; // 當(dāng)任務(wù)不多時(shí),需要?jiǎng)h除多余的線程,keepAliveTime為空閑時(shí)間 private long keepAliveTime; // unit為空閑時(shí)間的單位 private TimeUnit unit; // 線程是否已滿 private boolean fill = false; // 拒絕策略,由調(diào)用者傳入,當(dāng)隊(duì)列滿時(shí),執(zhí)行自定義策略 private RejectedExecutionHandler rejectedExecutionHandler; // 線程工廠,由調(diào)用者傳入 private ThreadFactory threadFactory; // 這次使用鏈表類型的單向隊(duì)列 BlockingQueue<Runnable> workQueue; public ThreadPoolExecutor(Integer corePollSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { this.corePollSize = corePollSize; this.rejectedExecutionHandler = rejectedExecutionHandler; this.threadFactory = threadFactory; this.workQueue = workQueue; this.maximumPoolSize = maximumPoolSize; this.keepAliveTime = keepAliveTime; this.unit = unit; } @Override public void execute(Runnable r) { // 如果沒有創(chuàng)建線程 if (!initThread(r)) { // offer和ArrayBlockingQueue的offer相同的作用 if (!workQueue.offer(r)) { // 隊(duì)列滿了以后先不走拒絕策略而是查詢線程數(shù)是否到達(dá)最大線程數(shù) if (create()) { Thread newThread = threadFactory.newThread(() -> { while (true) { // 然后該線程則不停的從隊(duì)列中獲取任務(wù) try { Runnable task = workQueue.poll(keepAliveTime, unit); if (task == null) { stop(); } else { task.run(); } } catch (InterruptedException e) { e.printStackTrace(); } } }); newThread.start(); // 工作線程數(shù)+1 workCount++; // 增加線程后,還需要將本應(yīng)該被拒絕的任務(wù)添加到隊(duì)列 try { // 這個(gè)put()方法會(huì)在隊(duì)列滿時(shí)阻塞添加,直到添加成功 workQueue.put(r); } catch (InterruptedException e) { e.printStackTrace(); } } else { rejectedExecutionHandler.rejectedExecution(); } } } } Lock clock = new ReentrantLock(); private boolean create() { //雙重檢查 if (workCount == maximumPoolSize) { return false; } try { clock.lock(); if (workCount < maximumPoolSize) { return true; } else { return false; } } finally { clock.unlock(); } } Lock slock = new ReentrantLock(); // 銷毀線程 private void stop() { slock.lock(); try { if (workCount > corePollSize) { System.out.println(Thread.currentThread().getName() + "線程被銷毀"); workCount--; Thread.currentThread().stop(); } } finally { slock.unlock(); } } // 獲取當(dāng)前的工作線程數(shù) public Integer getworkCount() { return workCount; } // 同步鎖,因?yàn)榕袛嗪诵木€程數(shù)和工作線程數(shù)的操作需要線程安全 Lock lock = new ReentrantLock(); public boolean initThread(Runnable r) { // 如果工作線程沒有創(chuàng)建滿則需要?jiǎng)?chuàng)建。 if (!fill) { try { lock.lock();// 把鎖 加在判斷里邊是為了不讓每次initThread方法執(zhí)行時(shí)都加鎖 // 此處進(jìn)行雙重判斷,因?yàn)榭赡芤驗(yàn)槎嗑€程原因多個(gè)線程都判斷工作線程沒有創(chuàng)建滿,但是不要緊 // 只有一個(gè)線程可以進(jìn)來,如果后續(xù)線程二次判斷已經(jīng)滿了就直接返回。 if (fill) { return false; } Thread newThread = threadFactory.newThread(() -> { // 因?yàn)榫€程是由任務(wù)觸發(fā)創(chuàng)建的,所以先把觸發(fā)線程創(chuàng)建的任務(wù)執(zhí)行掉。 { r.run(); } while (true) { // 然后該線程則不停的從隊(duì)列中獲取任務(wù) try { Runnable task = workQueue.take(); task.run(); } catch (InterruptedException e) { e.printStackTrace(); } } }); newThread.start(); // 工作線程數(shù)+1 workCount++; // 如果工作線程數(shù)已經(jīng)與核心線程數(shù)相等,則不可創(chuàng)建 if (workCount == corePollSize) { fill = true; } return true; } finally { lock.unlock();// 釋放鎖 } } else { // 工作線程已滿則不創(chuàng)建 return false; } } } |
|
|