第十四章: カスタムシンクロナイザを構築する

条件キューのお話。

条件キュー

Javaオブジェクトにはステートの条件を待つための仕組みが備わっている。
lock, wait, notify, notifyAll がそれに該当する。notifyは任意のスレッドが選択されて通知される。

例えば、サイズ制限付きバッファを実装する際、「バッファに空きがある(から追加出来る)」と「バッファに空きが無い(から追加出来ない)」という2つの条件述語がある。バッファはputとtakeを実装し、これらはどちらもチェック・ゼン・アクトの操作なので、sychronizedを使ってガードして実装する。
では、例えばputする際にバッファが満杯だった場合はどうするか。

  • Thread.sleepを使って一定時間待つ?:これは応答性が良好ではない
  • ループして待機する?:これはCPU資源の無駄使い

このような場合に、条件キューを使って実装すれば、必要なときに通知が促される。

public class BoundedBuffer<V> {
	
	private final V[] buffer;
	private int tail;
	private int head;
	private int count;
	
	@SuppressWarnings("unchecked")
	public BoundedBuffer(int capacity) {
		this.buffer = (V[]) new Object[capacity];
	}
	
	public synchronized boolean isFull() {
		return this.count == buffer.length;
	}
	
	public synchronized boolean isEmpty() {
		return this.count == 0;
	}
	
	public synchronized void put(V v) throws InterruptedException {
		
		while (isFull()) {
			wait();
		}
		doPut(v);
		notifyAll();
		
	}
	
	public synchronized V take() throws InterruptedException {
		
		while (isEmpty()) {
			wait();
		}
		V v = doTake();
		notifyAll();
		return v;
		
		
	}
} 

waitは自分の抱えているロックを解放し、カレントスレッドを待機させ(ブロックし)、タイムアウトやインタラプション、notifyされるまで待つ。ここでは、putとtakeが共に条件述語を満たすまでwaitしており、waitを抜けたら処理を実行してnotifyするようになっている。notifyされて初めてそのスレッドは処理を行う。

lockの注意点

1つの固有の条件キューを、複数の条件述語で使うことが出来る。誰かがnotifyしても、そのwaitの条件述語が真になったとは限らない。或は、もしかしたら一旦真になった後、waitしていたスレッドが再開する前に再び偽になっているかもしれない。
このような場合があるため、waitから目覚めるときは、再び条件述語を調べることが必須である。waitは常にループ中で呼び出し、条件述語を調べてから次の処理に進むようにすべきである。実際に上記のコード例はループ中で条件述語を調査している。

Condition

synchronizedと同様のセマンティクスを持ち、拡張可能にしたものをReentrantLockとするなら、Conditionは条件キューと同じセマンティクスを持ち、拡張可能にしたものと言える。awaitやsignalなど、waitとnotifyの代替となる機能を備えている。これは、LockインターフェースのnewCondition()によって生成可能。
Javadocから引用。

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock();
     try {
       while (count == items.length) 
         notFull.await();
       items[putptr] = x; 
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally {
       lock.unlock();
     }
   }

   public Object take() throws InterruptedException {
     lock.lock();
     try {
       while (count == 0) 
         notEmpty.await();
       Object x = items[takeptr]; 
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally {
       lock.unlock();
     }
   } 
}

このように、2つの条件述語を別々に記述したことによって、可読性も高まる。