자바 병렬 프로그래밍 summary - 8. 스레드 풀 활용
프로그래밍/병렬프로그래밍 2010. 4. 8. 06:52※ 개인적인 스터디를 위해 위 책의 내용을 정리한 내용입니다. 자세한 내용을 확인하고 싶으신 분은 위 책을 구매하셔서 확인하시기 바랍니다. http://www.yes24.com/24/goods/3015162?CategoryNumber=001001003
8. 스레드 풀 활용
8.1 작업과 실행 정책 간의 보이지 않는 연결 관계
일정한 조건을 갖춘 실행 정책이 필요한 작업에는 다음과 같은 것들이 있다.
- 의존성이 있는 작업
- 스레드 한정 기법을 사용하는 작업
- 응답 시간이 민감한 작업
- ThreadLocal 을 사용하는 작업
8.1.1 스레드 부족 데드락
8.1.2 오래 실행되는 작업
오래 실행되는 스레드의 갯수가 많을 수록 스레드풀의 응답속도는 느려지게 된다.
따라서 계속해서 대기하는 기능 대신 일정시간 동안만 대기하는 메소드를 사용할 수 있다면 응답 속도를 향상시킬 수 있다.
8.2 스레드 풀 크기 조절
스레드 풀의 크기는 설정 파일이나 Runtime.availableProcessors() 값에 따라 동적으로 지정되도록 해야 한다.
8.3 ThreadPoolExecutor 설정
8.3.1 스레드 생성과 제거
public ThreadPoolExecutor(int corePoolSize,
int maxizmumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){ ... }
corePoolSize 는 스레드 풀을 사용할때 원하는 스레드의 갯수라고 볼 수 있다.
queue 가 가득차지 않는 이상 corePoolSize 를 초과하지 않는다.
최초에 prestartAllCoreThreads 메소드를 실행하면 코어 크기만큼이 스레드가 미리 만들어진다.
newFixedThreadPool 의 경우 생성시 지정한 크기로 corePoolSize, maximumPoolSize 가 설정된다. 시간 제한은 무제한이다.
newCachedThreadPool 의 경우 corePoolSize = 0, maximumPoolSize 는 Integer.MAX_VALUE 로 설정된다. 시간 제한은 1분이다.
8.3.2 큐에 쌓인 작업 관리
newFixedThreadPool 과 newSingleThreadExecutor 는 기본설정으로 크기가 제한되지 않은 LinkedBlockingQueue를 사용한다.
크기가 제한된 큐를 사용하면 자원 사용량을 한정시킬 수 있지만 큐가 가득 찼을 때 새로운 작업을 등록하려는 상황을 처리해야 한다.
큐의 크기를 제한한 상태에서는 큐의 크기와 스레드의 갯수를 동시에 튜닝해야 한다.
newCachedThreadPool 은 SynchronousQueue 를 사용한다. SynchronousQueue 는 내부적으로 스레드 간에 작업을 넘겨주는 기능을 담당한다.
큐의 크기가 0 이므로 스레드의 크기가 최대값에 다다른 상태라면 작업을 거부하게 된다.
PriorityBlockingQueue 를 사용하면 우선 순위에 따라 작업이 실행되도록 할 수 있다.
(자바 6에서는 LinkedBlockingQueue 대신 SynchronousQueue 를 사용할때 자바 5 버전에 비해 세배 이상의 성능을 발휘한다.)
8.3.3 집중 대응 정책
크기가 제한된 큐에 작업이 가득 차면 집중 대응 정책 saturation policy 가 동작한다.
집중 대응 정책은 ThreadPoolExecutor 에 setRejectedExecutionHandler 메소드를 사용해 설정할 수 있다.
- 중단 정책(abort)
기본적으로 사용하는 집중 대응 정책이며 execute 메소드에서 RejectedExecutionException을 던진다.
- 제거 정책(discard)
큐에 추가하려던 작업을 제거해 버린다.
- 호출자 실행 정책(caller runs)
작업을 프로듀서에게 거꾸로 넘겨 작업 추가 속도를 늦출 수 있도록 일종의 속도 조절 방법으로 사용한다.
execute 메소드 내부에서 main 스레드가 해당 작업을 직접 처리함으로써 또 다른 새로운 작업이 추가되지 못하도록 한다.
따라서 그 시간동안 스레드는 큐의 작업을 처리할 시간을 가지게 된다.
ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandler( new ThreadPoolExecutor.CallerRunsPolicy());
8.3.4 스레드 팩토리
스레드풀에서 새로운 스레드를 생성할 때는 스레드 팩토리를 사용해 새로운 스레드를 생성한다.
public interface ThreadFactory{
Thread newThread(Runnable r);
}
스레드 팩토리에서 MyAppThread 를 생성하여 스레드의 이름, 현재 실행중인 스레드의 갯수 등을 확인할 수 있다.
public class MyAppThread extends Thread{
public static final String DEFAULT_NAME = "MyAppThread";
private static volatile boolean debugLifecycle = false;
private static final AtomicInteger created = new AtomicInteger();
private static final AtomicInteger alive = new AtomicInteger();
private static final Logger log = Logger.getAnonymousLogger();
public MyAppThread(Runnable r){ this(r, DEFAULT_NAME);}
public MyAppThread(Runnable runnable, String name){
super(runnable, name + "=" + created.incrementAndGet());
setUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler(){
public void uncaughtException(Thread t, Throwable e){
log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
}
}
);
}
public void run(){
boolean debug = debugLifecycle;
if( debug) log.log(Level.FINE, "Created " + getName());
try{
alive.incrementAndGet();
super.run();
}finally{
alive.decrementAndGet();
if(debug) log.log(Level.FINE, "Exiting " + getName());
}
}
public static int getThreadsCreated(){ return created.get();}
public static int getThreadsAlive(){ return alive.get();}
public static boolean getDebug(){ return debugLifecycle;}
public static void setDebug(boolean b){debugLifecycle = b;}
}
8.3.5 ThreadPoolExecutor 생성 이후 설정 변경
Executors 로 생성한 ThreadPool 은 관련된 set 메소드를 사용해서 내부의 설정된 값
(코어 스레드 갯수, 최대 스레드 갯수, 스레드 유지 시간, 스레드 팩토리, 작업 거부 처리 정책)을 변경할 수 있다.
Executors 에는 unconfigurableExecutorSevice 를 사용해서 설정을 변경할 수 없는 ThreadPool을 생성할 수도 있다.
8.4 ThreadPoolExecutor 상속
ThreadPoolExecutor 는 beforeExecute, afterExecute, terminated 와 같은 훅 메소드를 제공한다.
만약 beforeExecute 메소드에서 RuntimeException 이 발생하면 해당 작업뿐 아니라 afterExecute 메소드도 실행되지 않는다.
처리 순서
beforeExecute
작업
afterExecute
terminatd
8.4.1 예제 : 스레드 풀에 통계 확인 기능 추가
public class TimingThreadPool extends ThreadPoolExecute{
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final Logger log = Logger.getLogger("TimingThreadPool");
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r){
super.beforeExecute(t, r);
log.fine(String.format("Thread %s: start %s", t, r);
startTime.set(System.nanoTime());
}
protected void afterExecute(Runnable r, Throwable t){
try{
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
numTask.incrementAndGet();
totalTime.addAndGet(taskTime);
log.fine(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
}finally{
super.afterExecute(r, t);
}
}
protected void terminated(){
try{
log.info(String.format("Terminated: avg time=%dns", totalTime.get() / numTasks.get()));
}finally{
super.terminated();
}
}
}
8.5 재귀 함수 병렬화
순차적 프로그램을 병렬 프로그램으로 변경
void processSequentially(List<Element> elements){
for( Element e : elements)
process(e);
}
void processInParallel(Executor exec, List<Element> elements){
for( final Element e : elements)
exec.execute(new Runnable(){
public void run(){ process(e);}
});
}
ExecutorService.invokeAll 메소드를 사용하면 한 묶음의 작업을 한꺼번에 등록하고 그 작업들이 모두 종료될 때까지 대기하도록 할 수 있다.
public<T> void sequentialRecursive(List<Node<T>> nodes, Collection<T> results){
for(Node<T> n : nodes){
results.add(n.compute());
sequentialRecursive( n.getChildren(), results);
}
}
public<T> void paralleRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results){
for(final Node<T> n : nodes){
exec.execute(new Runnable(){
public void run(){
results.add(n.compute());
}
});
parallelRecursive(exec, n.getChildren(), results);
}
}
public<T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException{
ExecutorService exec = Executors.newCachedThreadPool();
Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
return resultQueue;
}
8.5.1 예제 : 퍼즐 프레임웍
'프로그래밍 > 병렬프로그래밍' 카테고리의 다른 글
자바 병렬 프로그래밍 summary - 11. 성능, 확장성 (2) | 2010.04.16 |
---|---|
자바 병렬 프로그래밍 summary - 10. 활동성을 최대로 높이기 (0) | 2010.04.08 |
자바 병렬 프로그래밍 summary - 7. 중단 및 종료 (0) | 2010.04.08 |
자바 병렬 프로그래밍 summary - 6. 작업 실행 (0) | 2010.04.08 |
자바 병렬 프로그래밍 summary - 5. 구성 단위 (2) | 2010.04.07 |