자바 병렬 프로그래밍 summary - 5. 구성 단위

프로그래밍/병렬프로그래밍 2010. 4. 7. 11:49


개인적인 스터디를 위해 위 책의 내용을 정리한 내용입니다. 자세한 내용을 확인하고 싶으신 분은 위 책을 구매하셔서 확인하시기 바랍니다. http://www.yes24.com/24/goods/3015162?CategoryNumber=001001003


 

5. 구성 단위

 

5.1 동기화된 컬렉션 클래스

Vector, Hashtable

Collections.synchronizedXxx 메소드를 사용하여 동기화된 클래스를 만들어 사용할 수 있었다.

 

public static Object getLast(Vector list){

           synchronized(list){

                     int lastIndex = list.size() - 1;

                     return list.get(lastIndex);

           }

}

 

public static void deleteLast(Vector list){

           synchronized(list){

                     int lastIndex = list.size() - 1;

                     list.remove(lastIndex);

           }

}

 

synchronized(vector){

           for(int i=0; i<vector.size(); i++)

                     doSomething(vector.get(i));

}

 

5.1.2 Iterator ConcurrentModificationException

List<Widget> widgetList = Collections.synchronizedList(new ArrayList<Widget>());

 

// ConcurrentModificationException 이 발생할 수 있다.

for(Widget w : widgetList)

           doSomething(w);

          

for-each 구문은 compile Iterator 를 사용한 구문으로 변경된다.

for-each 구문이 동기화 되어 있지 않을 경우에는 for-each 구문 실행시 다른 스레드에 의해 추가 삭제가 일어날 수 있다.

 

반복문을 실행하는 동안 컬렉션 클래스에 들어 있는 내용에 락을 걸어둔 것과 비슷한 효과를 내려면 clone 메소드로

복사본을 만들어 복사본을 대상으로 반복문을 사용할 수 있다. 최소한 clone 메소드를 실행하는 동안에는 컬렉션의 내용을

변경할 수 없도록 동기화시켜야 한다.

 

5.1.3 숨겨진 Iterator

public class HiddenIterator{

           @GuardedBy("this")

           private final Set<Integer> set = new HashSet<Integer>();

          

           public synchronized void add(Integer i){ set.add(i); }

           public synchronized void remove(Integer i){ set.remove(i); }

          

           public void addTenThings(){

                     Random r = new Random();

                     for( int i=0; i < 10; i++)

                                add(r.nextInt());

                     System.out.println( "DEBUG: added ten elements to " + set);

           }

}

set 변수는 사용시에 동기화되어 있어야 한다.

System.out.println 에서 set 변수는 내부적으로 toString 메소드를 호출하게된다.

toString 메소드내에서는 Iterator 를 사용하여 Set 내의 Element 에 접근한다.

따라서 해당 부분은 동기화 되어 있지 않다.

toString 뿐만 아니라 hashCode, equals 메소드도 내부적으로 iterator 를 사용한다. 뿐만 아니라

containsAll, removeAll, retainAll 의 메소드도 내부적으로 iterator 를 사용한다.

 

5.2 병렬 컬렉션

자바 5.0 에서는 HashMap 을 대체할 수 있는 ConcurrentHashMap 을 제공한다.

CopyOnWriteArrayList Element 의 열람 성능을 최우선으로 구현한 List 객체이다.

ConcurrentMap 인터페이스에는 put-if-absent, replace, conditional-remove 연산이 추가됬다.

 

기종의 동기화 컬렉션을 병렬 컬렉션으로 교체하는 것만으로 별다른 위험 요소 없이 전체적인 성능을 상당히 끌어 올릴 수 있다.

 

Queue 인터페이스(자바 5.0)

ConcurrentLinkedQueue, PriorityQueue

BlockingQueue

 

자바 6.0 에는 ConcurrentSkipListMap, ConcurrentSkipListSet 을 제공한다.

SortedMap, SortedSet 클래스의 병렬성을 높이도록 발전된 형태이다.

 

5.2.1 ConcurrentHashMap

ConcurrentHashMap lock striping 동기화 방법을 사용한다.

ConcurrentHashMap Iterator ConcurrentModificationException 을 발생하지 않는다.

따라서 반복문 실행시 동기화할 필요가 없다.

Iterator 를 만들었던 시점의 상황대로 반복을 처리한다.

size, isEmpty 의 결과가 정확하지 않다.

 

5.2.2 Map 기반의 또 다른 단일 연산

 

5.2.3 CopyOnWriteArrayList

CopyOnWriteArrayList 클래스는 동기화된 List 클래스보다 병렬성을 훨씬 높이고자 만들어졌다.

CopyOnWriteArrayList 는 컬렉션의 내용이 변경될 때마다 복사본을 새로 만들어 내는 전략을 취한다.

Iterator 사용시 변경되는 항목은 복사본에 반영된다.

 

컬렉션의 데이터가 변경될 때마다 복사본을 만들어내는 방법은 컬렉션에 많은 양의 자료가 들어 있다면 성능상의 손실이 크다.

따라서 변경 작업보다는 반복문으로 읽어내는 일이 훨씬 빈번한 경우에 효과적이다.

 

public interface ConcurrentMap<K,V> extends Map<K,V>{

           V putIfAbsent(K Key, V value);

          

           boolean remove(K Key, V value);

          

           boolean replace(K Key, V oldValue, V newValue);

          

           V replace(K Key, V newValue);

}

 

5.3 블로킹 큐와 프로듀서-컨슈머 패턴

BlockingQueue put, take 라는 핵심 메소드를 가지고 있다.

put 메소드 실행시 큐가 가득차 있을 경우에는 필요한 공간이 생길때까지 대기한다.

take 메소드 실행시 큐가 비었을 경우에는 값이 들어올때까지 대기한다.

offer 메소드는 큐에 값을 넣으려고 할때 공간이 없으면 추가할 수 없다는 오류를 알려준다.

 

블로킹 큐는 애플리케이션이 안정적으로 동작하도록 만들고자 할 때 요긴하게 사용할 수 있는 도구이다.

블로킹 큐를 사용하면 처리할 수 있는 양보다 훨씬 많은 작업이 생겨 부하가 걸리는 상황에서 작업량을 조절해

애플리케이션이 안정적으로 동작하도록 유도할 수 있다.

 

BlockingQueue 의 구현체

LinkedBlockingQueue, ArrayBlockingQueue, PriorityBlockingQueue, SynchronousQueue

 

5.3.1 예제 : 데스크탑 검색

 

public class FileClawler implements Runnable{

           private final BlockingQueue<File> fileQueue;

           private final FileFilter fileFilter;

           private final File root;

          

           public void run(){

                     try{

                                crawl(root);

                     }catch(InterruptedException e){

                                Thread.currentThread().interrupt();

                     }

           }

          

           private void crawl(File root) throws InterruptedException{

                     File[] entries = root.listFiles(fileFileter);

                     if( entries != null){

                                for( File entry : entries)

                                          if(entry.isDirectory())

                                                     crawl(entry);

                                          else if( !alreadyIndexed(entry))

                                                     fileQueue.put(entry);

                     }

           }

}

 

public class Indexer implements Runnable{

           private final BlockingQueue<File> queue;

          

           public Indexer(BlockingQueue<File> queue){

                     this.queue = queue;

           }

          

           public void run(){

                     try{

                                while(true)

                                          indexFile(queue.take());

                     }catch(InterruptedException e){

                                Thread.currentThread().interrupt();

                     }

           }

}

 

public static void startIndexing(File[] roots){

           BlockingQueue<File> queue = new LinkedBlockingQueue<File>(BOUND);

           FileFilter filter = new FileFilter(){

                     public boolean accept(File file){ return true; }

           };

          

           for( File root : roots)

                     new Thread(new FileCrawler(queue, filter, root)).start();

          

           for( int i=0; i<N_CONSUMERS; i++)

                     new Thread(new Indexer(queue)).start();

}

 

5.3.3 , 작업 가로채기

자바 6.0에서는 Deque, BlockingDeque 가 추가됐다. 둘은 Queue BlockingQueue 를 상속받은 인터페이스이다.

작업 가로채기 패턴에서는 모든 컨슈머가 각자의 덱을 갖는다. 만약 특정 컨슈머가 자신의 덱에 들어 있던 작업을 모두 처리하고 나면

다른 컨슈머의 덱에 쌓여있는 작업 가운데 맨 뒤에 추가된 작업을 가로채 가져올 수 있다.

작업 가로채기 패턴은 컨슈머가 프로듀서의 역할도 갖고 있는 경우에 적용하기 좋다.

 

5.4 블로킹 메소드, 인터럽터블 메소드

스레드가 블락되면 다음 블록된 상태(BLOCKED, WAITING, TIMED_WAITING)가운데 하나를 갖게 된다.

블락된 스레드는 특정 신호를 받게 되면 RUNNABLE 상태가 되어 CPU를 사용할 수 있게 된다.

BlockingQueue put, take 메소드는 블락킹 메소드이다.

 

InterruptedException 에 대한 대처 방안

- InterruptedException 을 전달 : 받아낸 InterruptedException을 그대로 호출한 메소드에게 넘겨버리는 방법

- 인터럽트를 무시하고 복구 :

InterruptedExceptino throw 할 수 없는 경우에는(Runnable 인터페이스를 구현한 경우) InterruptedException catch 한 다음

현재 스레드의 interrupt 메소드를 호출해 인터럽트 상태를 설정해 상위 호출 메소드가 인터럽트 상황이 발생했음을 알 수 있도록 해야 한다.

 

public class TaskRunnale implemetns Runnable {

           public void run(){

                     try{

                                processTask(queue.take());

                     }catch(InterruptedException e){

                                Thread.currentThread().interrupt();

                     }

           }

}

 

5.5 동기화 클래스

상태 정보를 사용해 스레드 간의 작업 흐름을 조절할 수 있도록 만들어진 모든 클래스를 동기화 클래스라고 한다.

- BlockingQueue, 세마포어 semaphore, 배리어 barrier, 래치 latch

 

5.5.1 래치

래치는 스스로가 터미널 terminal상태 에 이를 떄까지 스레드가 동작하는 과정을 늦출 수 있도록 해주는 동기화 클래스이다.

래치는 일종의 관문과 같다. terminal 상태에 이르면 관문이 열리고 그 이전 상태로 되돌릴 수 없다.

- 특정 자원을 확보하기 전에는 작업을 시작하지 말아야 하는 경우에 사용할 수 있다.

- 의존성을 갖고 있는 다른 서비스가 시작하기 전에는 특정 서비스가 실행되지 않도록 막아야 하는 경우에 사용할 수 있다.

- 특정 작업에 필요한 모든 객체가 실행할 준비를 갖출 때까지 기다리는 경우에도 사용할 수 있다.

CountDownLatch

하나 또는 둘 이상의 스레드가 여러 개의 이벤트가 일어날 때까지 대기할 수 있도록 되어 있다.

래치의 상태를 나타내는 숫자는 대기하는 동안 발생해야 하는 이벤트의 건수를 의미한다.

public class TestHarness {

           public long timeTasks(int nThreads, final Runnable task) throws InterruptedException{

                     final CountDownLatch startGate = new CountDownLatch(1);

                     final CountDownLatch endGate = new CountDownLatch(nThreads);

                    

                     for( int i=0; i<nThreads; i++){

                                Thread t = new Thread(){

                                          public void run(){

                                                     try{

                                                                startGate.await();

                                                                try{

                                                                          task.run();

                                                                }finally{

                                                                          endGate.countDown();

                                                                }

                                                     }catch(InterruptedException ignored){}

                                          }

                                };

                                t.start();

                     }

                    

                     long start = System.nanoTime();

                     startGate.countDown();

                     endGate.await();

                     long end = System.nanoTime();

                     return end-start;

           }

}

CountDownLatch 를 사용하여 n 개의 스레드가 동시에 동작할때 전체 실행 시간을 확인할 수 있다.

 

5.5.2 FutureTask

FutureTask 는 래치와 비슷한 형태로 동작한다.

Future 인터페이스를 구현하며 FutureTask 가 나타내는 연산적업은 Callable 인터페이스를 구현한다.

FutureTask 는 실제로 연산을 실행했던 스레드에서 만들어 낸 결과 객체를 실행시킨 스레드에게 넘겨준다.

public class Preloader{

           private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>( new Callable<ProductInfo>(){

                     public ProductInfo call() throws DataLoadException {

                                return loadProductInfo();

                     }

           });

          

           private final Thread thread = new Thread(future);

          

           public void start() { thread.start(); }

          

           public ProductInfo get() throws DataLoadException, InterruptedException {

                     try{

                                return future.get();

                     }catch(ExecutionException e){

                                Throwable cause = e.getCause();

                                if( cause instanceof DataLoadException)

                                          throw (DataLoadException) cause;

                                else

                                          throw launderThrowable(cause);

                     }

           }

}

Future.get 메소드에서 ExecutionException 이 발생하는 경우

- Callable 이 던지는 예외

- RuntimeException

- Error

 

5.5.3 세마포어 semaphore

카운팅 세마포어는 특정 자원이나 특정 연산을 동시에 사용하거나 호출할 수 있는 스레드의 수를 제한하고자 할 때 사용한다.

세마포어는 가상의 퍼밋을 만들어 내부 상태를 관리하며 acquire 메소드를 사용해 파밋을 획득할 수 있으며 release 메소드로 퍼밋을 반납한다. acquire 메소드로 취득할 퍼밋이 없을 경우에는 여유 퍼밋이 생길때까지 대기한다.

이진 세마포어는 퍼밋 값이 1로 지정된 세마포어이며 락의 역할을 하는 뮤텍스로 활용할 수 있다.

 

public class BoundHashSet<T>{

           private final Set<T> set;

           private final Semaphore sem;

          

           public BoundedHashSet(int bound){

                     this.set = Collections.synchronizedSet( new HashSet<T>);

                     sem = new Semaphore(bound);

           }

          

           public boolean add(T o) throws InterruptedException{

                     sem.acquire();

                     boolean wasAdded = false;

                     try{

                                wasAdded = set.add(o);

                                return wasAdded;

                     }finally{

                                if(!wasAdded)

                                          sem.release();

                     }

           }

          

           public boolean remove(Object o){

                     boolean wasRemoved = set.remove(o);

                     if(wasRemoved)

                                sem.release();

                     return wasRemoved;

           }

}

[세마포어를 사용해서 컬렉션의 크기 제한하기]

 

5.5.4 배리어 barrier

배리어는 특정 이벤트가 발생할 때까지 여러 개의 스레드를 대기 상태로 잡아둘 수 있다는 측면에서 래치와 비슷하다. 래치는 이벤트를 기다리기 위한 동기화 클래스이고, 배리어는 다른 스레드를 기다리기 위한 동기화 클래스이다.

배리어는 실제 작업은 여러 스레드에서 병렬로 처리하고, 다음 단계로 넘어가기 전에 스레드의 처리결과를 취합할 때 많이 사용된다.

 

barrier 포인트에 도달한 스레드는 await 메소드를 실행한다.

CyclicBarrier 는 배리어 작업을 정의할 수 있다. 배리어 작업은 배리어 통과후 스레드를 놓아주기 전에 실행된다.

public class CellularAutomata{

           private final Board mainBoard;

           private final CyclicBarrier barrier;

           private final Worker[] workers;

          

           public CellularAutomata(Board board){

                     this.mainBoard = board;

                     int count = Runtime.getRuntime().availableProcessors();

                     this.barrier = new CyclicBarrier(count,

                                new Runnable(){

                                          public void run(){

                                                     mainBoard.commitNewValues();

                                          }

                                });

                     this.workers = new Worker[count];

                     for(int i=0; i<count; i++)

                                workers[i] = new Worker(mainBoard.getSubBoard(count, i));

           }

          

           private class Worker implements Runnable{

                     private final Board board;

                    

                     public Worker(Board board){ this.board = board; }

                     public void run(){

                                while(!board.hasConverged()){

                                          for( int x=0; x<board.getMaxX(); x++)

                                                     for( int y=0; y<board.getMaxY(); y++)

                                                                board.setNewValue(x, y, computeValue(x,y));

                                          try{

                                                     barrier.await();

                                          }catch(InterruptedException ex){

                                                     return;

                                          }catch(BrokenBarrierException ex){

                                                     return;

                                          }

                                }

                     }

           }

          

           public void start(){

                     for(int i=0; i<workers.length; i++)

                                new Thread(workers[i]).start();

                     mainBoard.waitForConvergence();

           }

}

5.6 효율적이고 확장성 있는 결과 캐시 구현

캐시로 HashMap 대시 ConcurrentHashMap 을 사용할 경우 ConcurrentHashMap 은 스레드 안정성을 확보하기 때문에 별다른 동기화 방법을 사용하지 않아도 되며 병렬 프로그래밍에 대한 성능또한 개선된다.

 

public class Memoizer2<A, V> implements Computable<A, V>{

           private final Map<A, V> cache = new ConcurrentHashMap<A, V>();

           private final Computable<A, V> c;

          

           public Memoizer2(Computable<A, V> c){

                     this.c = c;

           }

          

           public V compute(A arg) throws InterruptedException{

                     V result = cache.get(arg);

                     if( result == null){

                                result = c.compute(arg);

                                cache.put(arg, result);

                     }

                    

                     return result;

           }

}

Memoizer2 compute를 수행줄일 때 동일한 값에 대한 요청이 들어오면 또 compute 를 수행한다. 아래 Memoizer3 Future 를 사용해 동일한 계산이 수행될 가능성을 대폭적으로 줄였다.

 

public class Memoizer3<A, V> implements Computable<A, V>{

           private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();

           private final Computable<A, V> c;

          

           public Memoizer3(Computable<A, V> c){

                     this.c = c;

           }

          

           public V compute(A arg) throws InterruptedException{

                     Future<V> f = cache.get(arg);

                     if( f == null){

                                Callable<V> eval = new Callable<V>(){

                                          public V call() throws InterruptedException{

                                                     return c.compute(arg);

                                          }

                                };

                                FutureTask<V> ft = new FutureTask<V>(eval);

                                f = ft;

                                cache.put(arg,ft);

                                ft.run();

                     }

                     

                     try{

                                return f.get();

                     }catch(ExecutionException e){

                                throw launderThrowable(e.getCause());

                     }

           }

}

 

최종 버전 cache.put 대신 cache.putIfAbsent 메소드 사용

public class Memoizer<A, V> implements Computable<A, V>{

           private final Map<A, Future<V>> cache = new ConcurrentHashMap<A, Future<V>>();

           private final Computable<A, V> c;

          

           public Memoizer(Computable<A, V> c){

                     this.c = c;

           }

          

           public V compute(A arg) throws InterruptedException{

                     while(true){

                                Future<V> f = cache.get(arg);

                                if( f == null){

                                          Callable<V> eval = new Callable<V>(){

                                                     public V call() throws InterruptedException{

                                                                return c.compute(arg);

                                                     }

                                          };

                                          FutureTask<V> ft = new FutureTask<V>(eval);

                                          f = cache.putIfAbsent(arg,ft);

                                          if( f == null){f = ft; ft.run(); }

                                }

                               

                                try{

                                          return f.get();

                                }catch(CancellationException e){

                                          cache.remove(arg, f);

                                }

                                }catch(ExecutionException e){

                                          throw launderThrowable(e.getCause());

                                }

                     }

           }

          

}

: