第五章 (2): プロデューサ・コンシューマパターン
ProducerとConsumerが互いを意識しなくて良いように、間接的に依存するプールを用いる。BlockingQueueがこのパターンをサポートする。BlockingQueueはスレッドセーフ。
- Producerの方が遅ければ, Consumerは待つだけ
- Consumerの方が遅ければ, Queueにサイズ制限を付けることによりメモリリークを防ぐことが出来る
イメージ的にはこんな感じで使う。
public static void main(String[] args) { BlockingQueue<File> queue = new LinkedBlockingQueue<File>(MAX_QUEUE_SIZE); FileFilter filter = new FileFilter() { public boolean accept(File pathname) { return true; } }; for (File root : roots) { new Thread(new FileCrawlProducer(root, filter, queue)).start(); } for (int i = 0; i < NUM_OF_CONSUMER_THREAD; i++) { new Thread(new FileOutputConsumer(i, queue)).start(); } } public class FileCrawlProducer implements Runnable { private final File root; private final FileFilter filter; private final BlockingQueue<File> queue; public FileCrawlProducer(File root, FileFilter filter, BlockingQueue<File> queue) { this.root = root; this.filter = filter; this.queue = queue; } public void run() { try { crawl(root); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } private void crawl(File root) throws InterruptedException { File[] files = root.listFiles(filter); if (files != null && files.length > 0) { for (File entry : files) { if (entry.isDirectory()) { crawl(entry); } else { queue.put(entry); } } } } } public class FileOutputConsumer implements Runnable { private final int seqNo; private final BlockingQueue<File> queue; public FileOutputConsumer(int seqNo, BlockingQueue<File> queue) { this.seqNo = seqNo; this.queue = queue; } public void run() { while (true) { try { output(queue.take()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } private void output(File entry) throws InterruptedException { if (entry == null) { return; } System.out.println(String.format("%d: %s (%s)", seqNo, entry.getName(), entry.getAbsolutePath())); } }
Thread周りを良く知らないので難しい・・・ちょっと脱線するけど、Thread#interrputやThread#yield, Thread#join, Object#wait, Object#notify辺りは勉強しないと駄目だ。