자바 병렬 프로그래밍 summary - 8. 스레드 풀 활용

프로그래밍/병렬프로그래밍 2010. 4. 8. 06:52

 개인적인 스터디를 위해 위 책의 내용을 정리한 내용입니다. 자세한 내용을 확인하고 싶으신 분은 위 책을 구매하셔서 확인하시기 바랍니다. http://www.yes24.com/24/goods/3015162?CategoryNumber=001001003


8. 스레드 풀 활용

 

8.1 작업과 실행 정책 간의 보이지 않는 연결 관계

일정한 조건을 갖춘 실행 정책이 필요한 작업에는 다음과 같은 것들이 있다.

- 의존성이 있는 작업

- 스레드 한정 기법을 사용하는 작업

- 응답 시간이 민감한 작업

- ThreadLocal 을 사용하는 작업

 

8.1.1 스레드 부족 데드락

 

8.1.2 오래 실행되는 작업

오래 실행되는 스레드의 갯수가 많을 수록 스레드풀의 응답속도는 느려지게 된다.

따라서 계속해서 대기하는 기능 대신 일정시간 동안만 대기하는 메소드를 사용할 수 있다면 응답 속도를 향상시킬 수 있다.

 

8.2 스레드 풀 크기 조절

스레드 풀의 크기는 설정 파일이나 Runtime.availableProcessors() 값에 따라 동적으로 지정되도록 해야 한다.

8.3 ThreadPoolExecutor
설정

 

8.3.1 스레드 생성과 제거

public ThreadPoolExecutor(int corePoolSize,

int maxizmumPoolSize,

           long keepAliveTime,

           TimeUnit unit,

           BlockingQueue<Runnable> workQueue,

           ThreadFactory threadFactory,

           RejectedExecutionHandler handler){ ... }

corePoolSize 는 스레드 풀을 사용할때 원하는 스레드의 갯수라고 볼 수 있다.

queue 가 가득차지 않는 이상 corePoolSize 를 초과하지 않는다.

최초에 prestartAllCoreThreads 메소드를 실행하면 코어 크기만큼이 스레드가 미리 만들어진다.

 

newFixedThreadPool 의 경우 생성시 지정한 크기로 corePoolSize, maximumPoolSize 가 설정된다. 시간 제한은 무제한이다.

newCachedThreadPool 의 경우 corePoolSize = 0, maximumPoolSize Integer.MAX_VALUE 로 설정된다. 시간 제한은 1분이다.

 

8.3.2 큐에 쌓인 작업 관리

newFixedThreadPool newSingleThreadExecutor 는 기본설정으로 크기가 제한되지 않은 LinkedBlockingQueue를 사용한다.

크기가 제한된 큐를 사용하면 자원 사용량을 한정시킬 수 있지만 큐가 가득 찼을 때 새로운 작업을 등록하려는 상황을 처리해야 한다.

큐의 크기를 제한한 상태에서는 큐의 크기와 스레드의 갯수를 동시에 튜닝해야 한다.

 

newCachedThreadPool SynchronousQueue 를 사용한다. SynchronousQueue 는 내부적으로 스레드 간에 작업을 넘겨주는 기능을 담당한다.

큐의 크기가 0 이므로 스레드의 크기가 최대값에 다다른 상태라면 작업을 거부하게 된다.

 

PriorityBlockingQueue 를 사용하면 우선 순위에 따라 작업이 실행되도록 할 수 있다.

 

(자바 6에서는 LinkedBlockingQueue 대신 SynchronousQueue 를 사용할때 자바 5 버전에 비해 세배 이상의 성능을 발휘한다.)

 

8.3.3 집중 대응 정책

크기가 제한된 큐에 작업이 가득 차면 집중 대응 정책 saturation policy 가 동작한다.

집중 대응 정책은 ThreadPoolExecutor setRejectedExecutionHandler 메소드를 사용해 설정할 수 있다.

 

- 중단 정책(abort)

           기본적으로 사용하는 집중 대응 정책이며 execute 메소드에서 RejectedExecutionException을 던진다.

- 제거 정책(discard)

           큐에 추가하려던 작업을 제거해 버린다.

- 호출자 실행 정책(caller runs)

           작업을 프로듀서에게 거꾸로 넘겨 작업 추가 속도를 늦출 수 있도록 일종의 속도 조절 방법으로 사용한다.

           execute 메소드 내부에서 main 스레드가 해당 작업을 직접 처리함으로써 또 다른 새로운 작업이 추가되지 못하도록 한다.

           따라서 그 시간동안 스레드는 큐의 작업을 처리할 시간을 가지게 된다.

 

ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY));

executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy());

 

8.3.4 스레드 팩토리

스레드풀에서 새로운 스레드를 생성할 때는 스레드 팩토리를 사용해 새로운 스레드를 생성한다.

 

public interface ThreadFactory{

           Thread newThread(Runnable r);

}

 

스레드 팩토리에서 MyAppThread 를 생성하여 스레드의 이름, 현재 실행중인 스레드의 갯수 등을 확인할 수 있다.

 

 

public class MyAppThread extends Thread{

           public static final String DEFAULT_NAME = "MyAppThread";

           private static volatile boolean debugLifecycle = false;

           private static final AtomicInteger created = new AtomicInteger();

           private static final AtomicInteger alive = new AtomicInteger();

           private static final Logger log = Logger.getAnonymousLogger();

          

           public MyAppThread(Runnable r){ this(r, DEFAULT_NAME);}

          

           public MyAppThread(Runnable runnable, String name){

                     super(runnable, name + "=" + created.incrementAndGet());

                     setUncaughtExceptionHandler(

                                new Thread.UncaughtExceptionHandler(){

                                          public void uncaughtException(Thread t, Throwable e){

                                                     log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);

                                          }

                                }

                     );

           }

          

           public void run(){

                     boolean debug = debugLifecycle;

                     if( debug) log.log(Level.FINE, "Created " + getName());

                     try{

                                alive.incrementAndGet();

                                super.run();

                     }finally{

                                alive.decrementAndGet();

                                if(debug) log.log(Level.FINE, "Exiting " + getName());

                     }

           }

          

           public static int getThreadsCreated(){ return created.get();}

           public static int getThreadsAlive(){ return alive.get();}

           public static boolean getDebug(){ return debugLifecycle;}

           public static void setDebug(boolean b){debugLifecycle = b;}

}

 

8.3.5 ThreadPoolExecutor 생성 이후 설정 변경

Executors 로 생성한 ThreadPool 은 관련된 set 메소드를 사용해서 내부의 설정된 값

(코어 스레드 갯수, 최대 스레드 갯수, 스레드 유지 시간, 스레드 팩토리, 작업 거부 처리 정책)을 변경할 수 있다.

 

Executors 에는 unconfigurableExecutorSevice 를 사용해서 설정을 변경할 수 없는 ThreadPool을 생성할 수도 있다.

 

8.4 ThreadPoolExecutor 상속

ThreadPoolExecutor beforeExecute, afterExecute, terminated 와 같은 훅 메소드를 제공한다.

만약 beforeExecute 메소드에서 RuntimeException 이 발생하면 해당 작업뿐 아니라 afterExecute 메소드도 실행되지 않는다.

 

처리 순서

beforeExecute

작업

afterExecute

terminatd

 

8.4.1 예제 : 스레드 풀에 통계 확인 기능 추가

public class TimingThreadPool extends ThreadPoolExecute{

           private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();

           private final Logger log = Logger.getLogger("TimingThreadPool");

           private final AtomicLong numTasks = new AtomicLong();

           private final AtomicLong totalTime = new AtomicLong();

          

           protected void beforeExecute(Thread t, Runnable r){

                     super.beforeExecute(t, r);

                     log.fine(String.format("Thread %s: start %s", t, r);

                     startTime.set(System.nanoTime());

           }

          

           protected void afterExecute(Runnable r, Throwable t){

                     try{

                                long endTime = System.nanoTime();

                                long taskTime = endTime - startTime.get();

                                numTask.incrementAndGet();

                                totalTime.addAndGet(taskTime);

                                log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));

                     }finally{

                                super.afterExecute(r, t);

                     }

           }

          

           protected void terminated(){

                     try{

                                log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));

                     }finally{

                                super.terminated();

                     }

           }

}

 

 

8.5 재귀 함수 병렬화

순차적 프로그램을 병렬 프로그램으로 변경

void processSequentially(List<Element> elements){

           for( Element e : elements)

                     process(e);

}

 

void processInParallel(Executor exec, List<Element> elements){

           for( final Element e : elements)

                     exec.execute(new Runnable(){

                                public void run(){ process(e);}

                     });

}

 

ExecutorService.invokeAll 메소드를 사용하면 한 묶음의 작업을 한꺼번에 등록하고 그 작업들이 모두 종료될 때까지 대기하도록 할 수 있다.

 

public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){

           for(Node<T> n : nodes){

                     results.add(n.compute());

                     sequentialRecursive( n.getChildren(), results);

           }

}

 

public<T> void paralleRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results){

           for(final Node<T> n : nodes){

                     exec.execute(new Runnable(){

                                public void run(){

                                          results.add(n.compute());

                                }

                     });

                     parallelRecursive(exec, n.getChildren(), results);

           }

}

 

public<T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException{

           ExecutorService exec = Executors.newCachedThreadPool();

           Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();

           parallelRecursive(exec, nodes, resultQueue);

           exec.shutdown();

           exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);

           return resultQueue;

}

 

8.5.1 예제 : 퍼즐 프레임웍

: