深入淺出 java Semaphore
發(fā)布時(shí)間:2023-08-14 10:38:28
Semaphore 是什么 Semaphore 也叫信號(hào)量,在 JDK1.5 被引入,可以用來(lái)控制同時(shí)訪問(wèn)特定資源的線程數(shù)量,通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用資源。
Semaphore 內(nèi)部維護(hù)了一組虛擬的許可,許可的數(shù)量可以通過(guò)構(gòu)造函數(shù)的參數(shù)指定。
訪問(wèn)特定資源前,必須使用 acquire 方法獲得許可,如果許可數(shù)量為 0,該線程則一直阻塞,直到有可用許可。訪問(wèn)資源后,使用 release 釋放許可。Semaphore 和 ReentrantLock 類似,獲取許可有公平策略和非公平許可策略,默認(rèn)情況下使用非公平策略。
應(yīng)用場(chǎng)景 Semaphore 可以用來(lái)做流量分流,特別是對(duì)公共資源有限的場(chǎng)景,比如數(shù)據(jù)庫(kù)連接。假設(shè)有這個(gè)的需求,讀取幾萬(wàn)個(gè)文件的數(shù)據(jù)到數(shù)據(jù)庫(kù)中,由于文件讀取是 IO 密集型任務(wù),可以啟動(dòng)幾十個(gè)線程并發(fā)讀取,但是數(shù)據(jù)庫(kù)連接數(shù)只有 10 個(gè),這時(shí)就必須控制最多只有 10 個(gè)線程能夠拿到數(shù)據(jù)庫(kù)連接進(jìn)行操作。這個(gè)時(shí)候,就可以使用 Semaphore 做流量控制。
public class SemaphoreTest {private static final int COUNT = 40;private static Executor executor = Executors.newFixedThreadPool(COUNT);private static Semaphore semaphore = new Semaphore(10);public static void main(String[] args) {for (int i=0; i< COUNT; i++) {executor.execute(new ThreadTest.Task());}}
通過(guò) unsafe.compareAndSwapInt 修改 state 的值,確保同一時(shí)刻只有一個(gè)線程可以釋放成功。許可釋放成功,當(dāng)前線程進(jìn)入到 AQS 的 doReleaseShared 方法,喚醒隊(duì)列中等待許可的線程。也許有人會(huì)有疑問(wèn),非公平性體現(xiàn)在哪里?當(dāng)一個(gè)線程 A 執(zhí)行 acquire 方法時(shí),會(huì)直接嘗試獲取許可,而不管同一時(shí)刻阻塞隊(duì)列中是否有線程也在等待許可,如果恰好有線程 C 執(zhí)行 release 釋放許可,并喚醒阻塞隊(duì)列中第一個(gè)等待的線程 B,這個(gè)時(shí)候,線程 A 和線程 B 是共同競(jìng)爭(zhēng)可用許可,不公平性就是這么體現(xiàn)出來(lái)的,線程 A 一點(diǎn)時(shí)間都沒(méi)等待就和線程 B 同等對(duì)待。
公平策略
acquire 實(shí)現(xiàn),核心代碼如下:
protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}acquires
值默認(rèn)為 1,表示嘗試獲取 1 個(gè)許可,remaining 代表剩余的許可數(shù)??梢钥吹胶头枪讲呗韵啾?,就多了一個(gè)對(duì)阻塞隊(duì)列的檢查。
如果阻塞隊(duì)列沒(méi)有等待的線程,則參與許可的競(jìng)爭(zhēng)。否則直接插入到阻塞隊(duì)列尾節(jié)點(diǎn)并掛起,等待被喚醒。release 實(shí)現(xiàn),和非公平策略一樣。
以上為本次全部分享內(nèi)容