并發(fā)庫(kù)中的BlockingQueue是一個(gè)比較好玩的類,顧名思義,就是阻塞隊(duì)列。該類主要提供了兩個(gè)方法put()和take(),前者將一個(gè)對(duì)象放到隊(duì)列中,如果隊(duì)列已經(jīng)滿了,就等待直到有空閑節(jié)點(diǎn);后者從head取一個(gè)對(duì)象,如果沒(méi)有對(duì)象,就等待直到有可取的對(duì)象。
下面的例子比較簡(jiǎn)單,一個(gè)讀線程,用于將要處理的文件對(duì)象添加到阻塞隊(duì)列中,另外四個(gè)寫(xiě)線程用于取出文件對(duì)象,為了模擬寫(xiě)操作耗時(shí)長(zhǎng)的特點(diǎn),特讓線程睡眠一段隨機(jī)長(zhǎng)度的時(shí)間。另外,該Demo也使用到了線程池和原子整型(AtomicInteger),AtomicInteger可以在并發(fā)情況下達(dá)到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞隊(duì)列的put和take操作會(huì)阻塞,為了使線程退出,特在隊(duì)列中添加了一個(gè)“標(biāo)識(shí)”,算法中也叫“哨兵”,當(dāng)發(fā)現(xiàn)這個(gè)哨兵后,寫(xiě)線程就退出。
當(dāng)然線程池也要顯式退出了。
package concurrent; import java.io.File; import java.io.FileFilter; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger;
public class TestBlockingQueue { static long randomTime() { return (long) (Math.random() * 1000); }
public static void main(String[] args) { // 能容納100個(gè)文件 final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100); // 線程池 final ExecutorService exec = Executors.newFixedThreadPool(5); final File root = new File("F:\\JavaLib"); // 完成標(biāo)志 final File exitFile = new File(""); // 讀個(gè)數(shù) final AtomicInteger rc = new AtomicInteger(); // 寫(xiě)個(gè)數(shù) final AtomicInteger wc = new AtomicInteger(); // 讀線程 Runnable read = new Runnable() { public void run() { scanFile(root); scanFile(exitFile); }
public void scanFile(File file) { if (file.isDirectory()) { File[] files = file.listFiles(new FileFilter() { public boolean accept(File pathname) { return pathname.isDirectory() || pathname.getPath().endsWith(".java"); } }); for (File one : files) scanFile(one); } else { try { int index = rc.incrementAndGet(); System.out.println("Read0: " + index + " " + file.getPath()); queue.put(file); } catch (InterruptedException e) { } } } }; exec.submit(read); // 四個(gè)寫(xiě)線程 for (int index = 0; index < 4; index++) { // write thread final int NO = index; Runnable write = new Runnable() { String threadName = "Write" + NO; public void run() { while (true) { try { Thread.sleep(randomTime()); int index = wc.incrementAndGet(); File file = queue.take(); // 隊(duì)列已經(jīng)無(wú)對(duì)象 if (file == exitFile) { // 再次添加"標(biāo)志",以讓其他線程正常退出 queue.put(exitFile); break; } System.out.println(threadName + ": " + index + " " + file.getPath()); } catch (InterruptedException e) { } } } }; exec.submit(write); } exec.shutdown(); } }
|