자바 병렬 프로그래밍 summary - 14. 동기화 클래스 구현

프로그래밍/병렬프로그래밍 2010. 4. 29. 23:34

 개인적인 스터디를 위해 위 책의 내용을 정리한 내용입니다. 자세한 내용을 확인하고 싶으신 분은 위 책을 구매하셔서 확인하시기 바랍니다. http://www.yes24.com/24/goods/3015162?CategoryNumber=001001003


14. 동기화 클래스 구현

암묵적인 조건 큐 condition queue, 명시적인 Condition 객체, AbstractQueuedSynchronizer 프레임 웍 등 저수준 기능을 사용하여 원하는 동기화 클래스를 구현할 수 있다.

 

 

14.1 상태 종속성 관리

병렬 프로그램에서 상태 기반의 조건은 다른 스레드를 통해서 언제든지 마음대로 변경될 수 있다.

 

[상태 종속적인 작업의 동기화 구조]

void blockingAction() throws InterruptedException{

             상태 변수에 대한 확보

             while(선행조건이 만족하지 않음){

                    확보했던 락을 풀어줌

                    선행 조건이 맍고할만한 시간만큼 대기

                    인터럽트에 걸리거나 타임아웃이 걸리면 멈춤

                    락을 다시 확보

             }

             작업 실행

             해제

       }

 

위의 예는 락을 활용하는 형태가 그다지 일반적인지 않다. 이를테면 작업하고자 확보했던 락을 그 내부에서 다시 풀어주고 또 다시 확보하는 우스꽝스런 모습이다.

 

 

14.1.1 예제 : 선행 조건 오류를 호출자에게 그대로 전달

 

[선행 조건이 맞지 않으면 그냥 멈춰버리는 버퍼 클래스]

public synchronized void put(V v) throws BufferFullExceptipon{

             if(isFull())

                    throw new BufferFullException();

             doPut(v);

       }

      

       public synchronized V take() throws BufferEmptyException{

             if(isEmpty())

                    throw new BufferEmptyException();

             return doTake();

       }

위의 메소드를 사용하는 클래스는 put, take 메소드를 호출할 때마다 발생할 가능성이 있는 예외 상황을 매번 처리해줘야 한다. 이처럼 상태 종속성을 호출자에게 넘기는 방법을 쓰면 FIFO 큐에서 값의 순서를 정확하게 유지하는 일이 불가능하다.

 

 

14.1.2 예제 : 폴링과 대기를 반복하는 세련되지 못한 대기 상태

public void put(V v) throws InterruptedException{

             while(true){

                    synchronized(this){

                           if(!isFull()){

                                 doPut(v);

                                 return;

                           }

                    }

                    Thread.sleep(SLEEP_GRANULARITY);

             }

       }

 

public V take() throws InterruptedException{

             while(true){

                    synchronized(this){

                           if(!isEmpty()){

                                 return doTake();

                           }

                    }

                    Thread.sleep(SLEEP_GRANULARITY);

             }

       }

 

sleep 시간에 따라 응답속도와 CPU 사용량 간의 트레이드 오프 trade off 가 발생한다.

 

 

14.1.3 조건 큐 문제 해결사

 

[조건 큐를 사용해 구현한 BoundedBuffer]

public synchronized void put(V v) throws BufferFullExceptipon{

             while(isFull())

                    wait();

             doPut(v);

             notifyAll();

       }

      

       public synchronized V take() throws BufferEmptyException{

             while(isEmpty())

                    wait();

             V v = doTake();

             notifyAll();

             return v;

       }

 

자바 언어에서는 모든 객체를 락으로 활용할 수 있는 것처럼 모든 객체는 스스로를 조건 큐로 사용할 수 있으며, 모든 객체가 갖고 있는 wait, notity, notifyAll 메소드는 조건 큐의 암묵적인 API라고 봐도 좋다.

Object.wait 메소드는 현재 확보하고 있는 락을 자동으로 해제하면서 운영체제에게 현재 스레드를 멈춰달라고 요청하고, 따라서 다른 스레드가 락을 확보해 객체 내부의 상태를 변경할 수 있도록 해준다.

조건 큐를 사용했을 때 이전 버전보다 CPU 사용의 효율성, 컨텍스트 스위치 관련 부화, 응답 속도 측면에서 최적화 되어 있다.

 

 

14.2 조건 큐 활용

조건 큐를 활용하려면 꼭 지켜야만 하는 몇 가지 규칙이 있다.

 

 

14.2.1 조건 서술어

조건 서술어는 클래스 내부의 상태 변수에서 유추할 수 있는 표현식이다.

take 메소드의 입장에서는 작업을 진행하기 전에 확인해야만 하는 버퍼에 값이 있어야 한다는 것이 조건 서술어이다. put 메소드의 입장에서는 버퍼에 빈 공간이 있다.” 는 것이 조건 서술어이다.

 

조건 큐와 연결된 조건 서술어를 항상 문서로 남겨야 하며, 그 조건 서술어에 영향을 받는 메소드가 어느 것인지도 명시해야 한다.

 

조건부 대기와 관련된 락과 wait 메소드와 조건 서술어는 중요한 삼각 관계르 유지하고 있다. 조건 서술어는 상태 변수를 기반으로 하고 있고, 상태 변수는 락으로 동기화되어 있어 조건 서술어를 만족하는지 확인하려면 반드시 락을 확보해야만 한다. 또 락 객체와 조건 큐 객체는 반드시 동일한 객체여야만 한다.

 

 

14.2.2 너무 일찍 깨어나기

하나의 암묵적인 조건 큐를 두 개 이상의 조건 서술어를 대상으로 사용할 수도 있다. 따라서 어디에선가 notifyAll 이 호출돼서 스레드가 깨어났다하더라도 wait 하기 직전에 확인했던 조건 서술어를 만족하게 된다는 것은 아니다. notifyAll 을 호출하는 시점에는 조건 서술어를 만족하는 상태였다고 해도 락을 확보하고 나서는 만족하지 않는 상태가 됐을 가능성도 있다. 다른 스레드가 미리 락을 확보하고 조건 서술어와 관련된 상태 변수의 값을 변경시킬 가능성도 있다.

 

[상태 종속적인 메소드의 표준적인 형태]

void stateDependentMethod() throws InterruptedException{

             // 조건 서술어는 반드시 락으로 동기화된 이후에 확인해야 한다.

             synchronized(lock){

                    while(!conditionPredicate())

                           lock.wait();

                    // 객체가 원하는 상태에 맞쳐졌다.

             }

       }

 

조건부 wait 메소드(Object.wait 또는 Condition.wait) 사용할 유의점

- 항상 조건 서술어(작업을 계속 진행하기 전에 반드시 확인해야 하는 확인 절차) 명시해야 한다.

- wait 메소드를 호출하기 전에 조건 서술어를 확인하고, wait 에서 리턴된 이후에도 조건 서술어를 확인해야 한다.

- 조건 서술어를 확인하는 관련된 모든 상태 변수는 해당 조건 큐의 락에 의해 동기화 있어야 한다.

- wait, notify, notifyAll 메소드를 호출할 때는 조건 큐에 해당하는 락을 확보하고 있어야 한다.

- 조건 서술어를 확인한 이후 실제로 작업을 실행해 작업이 끝날 때까지 락을 해제해서는 된다.

 

 

14.2.3 놓친 신호

wait 메소드를 호출하기 전에 조건 서술어를 확인하지 못하는 경우가 생길 있다면 놓친 신호 문제가 발생할 가능성도 있다. wait 메소드 작성시 위의 조건과 같은 형태로 작성되었다면 놓친 신호 문제에 대해 걱정하지 않아도 된다.

 

 

14.2.4 알림

notify 메소드를 호출하면 JVM 해당하는 조건 큐에서 대기 상태에 들어가 있는 다른 스레드 하나를 골라 대기 상태를 풀어준다. notifyAll 호출하면 해당하는 조건 큐에서 대기 상태에 들어가 있는 모든 스레드를 풀어준다. notify, notifyAll 호출한 이후에는 최대한 빨리 락을 풀어줘야 대기 상태에서 깨어난 스레드가 빠르게 동작을 취할 있다.

 

여러 개의 스레드가 하나의 조건 큐를 놓고 대기 상태에 들어갈 있는데, 대기 상태에 들어간 조건이 서로 다를 있기 때문에 notifyAll 대신 notify 메소드를 사용해 대기 상태를 풀어주는 방법은 위험성이 높다.

 

notifyAll 대신 notify 메소드를 사용하려면 다음과 같은 조건에 해당하는 경우에만 사용하는 것이 좋다.

- 단일 조건에 따른 대기 상태에서 깨우는 경우 : 해당하는 조건 큐에 하나의 조건만 사용하고 있는 경우이고, 따라서 스레드는 wait 메소드에서 리턴될 동일한 방법으로 실행된다.

- 번에 하나씩 처리하는 경우 : 조건 변수에 대한 알림 메소드를 호출하면 하나의 스레드만 실행시킬 있는 경우

 

notifyAll 호출하게 되면 많은 스레드가 모두 깨어나서 다시 락을 잡으려고 하기 때문에 컨텍스트 스위칭이 빈번하게 일어나고, 상당량의 확보 경쟁이 벌어진다.

 

조건부 알림 conditional notification

take put 메소드가 대기 상태에서 빠져나올 있는 상태를 만들어주는 경우에만 알림 메소드를 호출하도록 하면 이런 보수적인 측면을 최적화할 있다.

 

 

[조건부 알림을 적용한 경우]

public synchronized void put(V v) throws InterruptedException{

             while(isFull())

                    wait();

             boolean wasEmpty = isEmpty();

             doPut(v);

             if(wasEmpty)

                    notifyAll();

       }

 

wasEmpty 변수 값을 통해서 큐가 비워져 있었는지를 판단한다. 비워져 있었을 경우에는 take 메소드에서 대기중인 스레드가 있을 가능성이 있다고 판단하여 notifyAll 호출해 준다.

 

 

14.2.5 예제 : 게이트 클래스

게이트 클래스는 문이 열릴떄까지 모든 스레드를 대기하도록 하고 특정 조건이 맞을 모든 스레드를 통과하게 한다.

 

@ThreadSafe

public class ThreadGate{

       // 조건 서술어 : opened-since(n) (isOpen || generation)

       @GuardedBy("this") private boolean isOpen;

       @GuardedBy("this") private int generation;

      

       public synchronized void close(){

             isOpen = false;

       }

      

       public synchronized void open(){

             ++generation;

             isOpen = true;

             notifyAll();

       }

      

       // 만족할 때까지 대기 : opened-since (generation on entry)

       public synchronized void await() throws InterruptedException {

             int arrivalGeneration = generation;

             while (!isOopen && arrivalGeneration == generation)

                    wait();

       }

}

 

ThreadGate 클래스는 여러 열고 닫는 것을 가능하게 설계되었다.

스레드는 await 메소드를 호출하여 대기하게 되고 어디선가 open, close 연속으로 호출하게 되면 게이트를 열고 닫는다.

open 메소드 호출시 게이트가 열리게 되며 대기중이던 모든 스레드가 해당 게이트를 통과하게 된다. open, close 메소드는 찰나에 실행되므로 사이에 모든 스레드가 락을 획득하여 통과하지 못할 수도 있다. 따라서 ThreadGate 클래스에서는 arrivalGeneration 변수를 두어 generation 변수와 비교하고 있다. open 메소드 호출시 generation 값을 증가하여 대기하던 스레드의 arrivalGeneration 값과 비교하며 변수의 값이 다르다면 통과하도록 되어 있다.

 

 

14.2.6 하위 클래스 안전성 문제

 

 

14.2.7 조건 큐 캡슐화

일반적으로 조건 큐를 클래스 내부에 캡슐화해서 클래스 상속 구조의 외부에서는 해당 조건 큐를 사용할 수 없도록 막는게 좋다.

 

 

14.2.8 진입 규칙과 완료 규칙

wait notify를 적용하는 규칙을 진입규칙과 완료규칙으로 표현한다. 즉 상태를 기반으로 하는 모든 연산과 상태에 의존성을 갖고 있는 또 다른 상태를 변경하는 연산을 수행하는 경우에는 항상 진입규칙과 완료규칙을 정의하고 문서화해야 한다.

 

 

14.3 명시적인 조건 객체

암묵적인 락을 일반화한 형태가 Lock 클래스인 것처럼 암묵적인 조건 큐를 일반화한 형태는 바로 Condition 객체이다.

 

public interface Condition{

       void await() throws InterruptedException;

       boolean await(long time, TimeUnit unit) throws InterruptedException;

       long awaitNanos(long nanosTimeout) throws InterruptedException;

       void awaitUninterrupterbly();

       boolean awaitUntil(Date deadline) throws InterruptedException;

       void signal();

       void signalAll();

}

 

Condition 클래스는 내부적으로 하나의 Lock 클래스를 사용해 동기화를 맞춘다. Condition 인스턴스를 생성하려면 Lock.newCondition 메소드를 호출한다.

 

위험성 경고 : Condition 객체 사용시 await, signal, signalAll 을 사용해야 하며 Object 에서 상속받는 wait, notify, notifyAll 사용시 동기화 기능에 큰 문제가 생길 수 있다.

 

protected final Lock lock = new ReentrantLock();

// 조건 서술어 : notFull (count < items.length)

private final Condition notFull = lock.newCondition();

// 조건 서술어 : notEmpty (count > 0)

private final Condition notEmpty = lock.newCondition();

 

@GuardedBy("lock") private final T[] items = (T[]) new Object(BUFFER_SIZE);

@GuardedBy("lock") private int tail, head, count;

 

// 만족할 때까지 대기 : notFull

public void put(T x) throws InterruptedException{

       lock.lock();

       try{

             while(count==items.length)

                    notFull.await();

             items[tail] = x;

             if(++tail == items.length)

                    tail = 0;

             ++count

             notEmpty.signal();

       }finally{

             lock.unlock();

       }

}

 

// 만족할 때까지 대기 : notEmpty

public T take() throws InterruptedException{

       lock.lock();

       try{

             while(count == 0)

                    notEmpty.await();

             T x = items[head];

             items[head] = null;

             if(++head==items.length)

                    head=0;

             --count;

             notFull.signal();

             return x;

       }finally{

             lock.unlock();

       }

}

 

위의 예는 조건별로 Condition 객체를 생성하여 처리한 코드이다.

 

조건별로 각각의 Conditino 객체를 생성해 사용하면 클래스 구조를 분석하기가 쉽다. notifyAll 과 같은 signalAll 대신 그보다 더 효율적인 signal 메소드를 사용해 동일한 기능을 처리할 수 있으므로, 컨텍스트 스위치 횟수도 줄일 수 있고 버퍼의 기능이 동작하는 동안 각 스레드가 락을 확보하는 횟수 역시 줄일 수 있다.

 

조건에 관련되 모든 변수는 Lock의 보호 아래 동기화돼 있어야 하고, 조건을 확인하거나 await 또는 signal 메소드를 호출하는 시점에는 반드시 Lock 을 확보한 상태여야 한다.

 

 

14.4 동기화 클래스의 내부 구조

ReentrantLock, Semaphore, CountDownLatch, ReentrantReadLock, SynchronousQueue, FutureTask 는 모두 AbstractQueuedSynchronizer(AQS) 를 상속받아 구현돼 있다.

 

 

14.5 AbstractQueuedSynchronizer

AQS 기반의 동기화 클래스가 담당하는 작업 가운데 가장 기본이 되는 연산은 확보 acquire 와 해제 release 이다. 확보 연산은 상태기반으로 동작하여 항상 대기상태에 들어갈 가능성이 있다.

AQS getState, setState, compareAndSetState 메소드를 사용해 동기화 클래스의 상태 변수를 관리할 수 있다.

배타적인 확보기능을 제공하는 동기화 클래스는 tryAcquire, tryReleas, isHeldExclusively 등의 메소드를 구현해야 하며, 배타적이지 않은 확보 기능을 지원하는 동기화 클래스는 tryAcquireShared, tryReleaseShared 메소드를 제공해야 한다.

위의 메소드는 acquire, acquireShared, release, releaseShared 메소드 내부에서 호출된다.

 

 

14.5.1 간단한 래치

 

@ThreadSafe

public class OneShotLatch {

       private final Sync sync = new Sync();

      

       public void signal() { sync.releaseShared(0);}

      

       public void await() throws InterruptedException {

              sync.acquireSharedInterruptibly(0);

       }

      

       private class Sync extends AbstractQueuedSynchronizer {

             protected int tryAcquireShared(int ignored){

                    // 래치가 열려 있는 상태(state==1)라면 성공, 아니면 실패

                    return (getState()==1) ? 1 : -1;

             }

            

             protected boolean tryReleaseShared(int ignored){

                    setState(1); // 래치가 열렸다.

                    return true; // 다른 스레드에서 확보 연산에 성공할 가능성이 있다.

             }

       }

}

 

OneShotLatch await 메소드로 스레드를 대기하도록 한다음 signal 메소드로 대기하던 모든 스레드를 통과시키는 클래스이다.

AQS releaseShared(int) 메소드는 내부적으로 tryAcquiredShared(int) 메소드를 호출하게 된다. acquireSharedInterruptibly(int) 메소드는 내부적으로 tryAcquireShared(int) 메소드를 호출한다. 따라서 AQS 기본으로 제공해주는 메소드를 사용하면 쉽게 동기화 클래스를 구현할 있다.

 

OneShotLatch AQS 의 핵심 기능을 위임 delegate 하는 형식으로 구현되었다. AQS 를 상속하는 방법은 권장하지 않는다.

 

 

14.6 java.util.concurent 패키지의 동기화 클래스에서 AQS 활용 모습

java.util.concurrent 패키지에 있는 ReentrantLock, Semaphore, ReentrantReadWriteLock, CountDownLatch, SynchronousQueue, FutureTask 의 클래스는 AQS를 기반으로 구현돼어 있다.

 

 

14.6.1 ReentrantLock

 

protected boolean tryAcquire(int ignored){

       final Thread current = Thread.currentThread();

       int c = getState();

       if(c == 0){

             if(compareAndSetState(0,1)){

                    owner = current;

                    return ture;

             }

       }else if(current == owner){

             setState(c+1);

             return true;

       }

       return false;

}

 

ReentrantLock 은 배타적인 확보 연산만 제공하기 때문에 tryAcquire, tryRelease, isHeldExclusively 와 같은 메소드만 구현하고 있다. 또 동기화 상태 값을 확보된 락의 개수를 확인하는 데 사용한다.(재진입에 따른 락의 개수) owner 변수를 통해 락을 확보한 스레드를 관리한다. 락의 재진입 여부도 owner 변수를 통해 확인한다.

compareAndState(int, int) 기본의 설정 값과 비교, 상태 값 설정의 2개의 연산을 단일 연산으로 처리한다.

 

 

14.6.2 Semaphore CountDownLatch

 

protected int tryAcquireShared(int acquires){

       while(true){

             int available = getState();

             int remaining = available - acquire;

             if(remaining < 0

                           || compareAndSetState( available, remaining))

             return remaining;

       }

}

 

protected boolean tryReleaseShared(int release){

       while(true){

             int p = getState();

             if(compareAndSetState(p, p + release))

                    return true;

       }

}

Semaphore AQS의 동기화 상태를 사용해 현재 남아 있는 퍼밋의 개수를 관리한다.

메소드 내부의 while 반복문은 충분한 개수의 퍼밋이 없거나 tryAcquireShared 메소드가 확보 연산의 결과로 퍼밋 개수를 단일 연산으로 변경할 수 있을 때까지 반복한다.

 

CoundDownLatch 클래스도 동기화 상태 값을 현재 개수로 사용하는, Semaphore와 비슷한 형태로 AQS를 활용한다.

 

 

14.6.3 FutureTask

FutureTask 는 내부의 작업이 완료되거나 취소되는 이벤트가 발생하면 해당 스레드가 계속 진행할 수 있고, 아니면 원하는 이벤트가 발생할 때까지 스레드가 대기 상태에 들어간다.

 

 

14.6.4 ReentrantReadWriteLock

ReentrantReadWriteLock 클래스는 AQS 클래스 하나로 읽기 작업과 쓰기 작업을 모두 담당한다.

상태변수의 32개 비트 가운데 16비트로는 쓰기 락에 대한 개수를 관리하고, 나머지 16비트로는 읽기 락의 개수를 관리한다. 읽기 락에 대한 기능은 독점적이지 않은 확보와 해제 연산으로 구현돼 있고, 쓰기 락에 대한 기능은 독점적인 확보와 해제 연산을 사용한다.

 

 

 

 

: