第五章 (2): プロデューサ・コンシューマパターン

ProducerとConsumerが互いを意識しなくて良いように、間接的に依存するプールを用いる。BlockingQueueがこのパターンをサポートする。BlockingQueueはスレッドセーフ。

  • Producerの方が遅ければ, Consumerは待つだけ
  • Consumerの方が遅ければ, Queueにサイズ制限を付けることによりメモリリークを防ぐことが出来る

イメージ的にはこんな感じで使う。

public static void main(String[] args) {

	BlockingQueue<File> queue = new LinkedBlockingQueue<File>(MAX_QUEUE_SIZE);
	FileFilter filter = new FileFilter() {
		public boolean accept(File pathname) {
			return true;
		}
	};

	for (File root : roots) {
		new Thread(new FileCrawlProducer(root, filter, queue)).start();
	}
	for (int i = 0; i < NUM_OF_CONSUMER_THREAD; i++) {
		new Thread(new FileOutputConsumer(i, queue)).start();
	}

}

public class FileCrawlProducer implements Runnable {

	private final File root;
	private final FileFilter filter;
	private final BlockingQueue<File> queue;

	public FileCrawlProducer(File root, FileFilter filter, BlockingQueue<File> queue) {
		this.root = root;
		this.filter = filter;
		this.queue = queue;
	}

	public void run() {
		try {
			crawl(root);
		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}

	private void crawl(File root) throws InterruptedException {
		File[] files = root.listFiles(filter);
		if (files != null && files.length > 0) {
			for (File entry : files) {
				if (entry.isDirectory()) {
					crawl(entry);
				} else {
					queue.put(entry);
				}
			}
		}
	}

}

public class FileOutputConsumer implements Runnable {

	private final int seqNo;
	private final BlockingQueue<File> queue;

	public FileOutputConsumer(int seqNo, BlockingQueue<File> queue) {
		this.seqNo = seqNo;
		this.queue = queue;
	}

	public void run() {
		while (true) {
			try {
				output(queue.take());
			} catch (InterruptedException e) {
				Thread.currentThread().interrupt();
			}
		}
	}

	private void output(File entry) throws InterruptedException {
		if (entry == null) {
			return;
		}
		System.out.println(String.format("%d: %s (%s)", seqNo, entry.getName(), entry.getAbsolutePath()));
	}
}

Thread周りを良く知らないので難しい・・・ちょっと脱線するけど、Thread#interrputやThread#yield, Thread#join, Object#wait, Object#notify辺りは勉強しないと駄目だ。