자바 병렬 프로그래밍 summary - 7. 중단 및 종료

프로그래밍/병렬프로그래밍 2010. 4. 8. 06:50

 개인적인 스터디를 위해 위 책의 내용을 정리한 내용입니다. 자세한 내용을 확인하고 싶으신 분은 위 책을 구매하셔서 확인하시기 바랍니다. http://www.yes24.com/24/goods/3015162?CategoryNumber=001001003

7. 중단 및 종료

작업이나 스레드를 안전하고 빠르고 안정적으로 멈추게 하는 것은 어려운 일이다.

자바에는 스레드가 작업을 실행하고 있을 때 강제로 멈추도록 하는 방법이 없다.

interrupt 를 사용하여 특정 스레드에게 작업을 멈춰 달라고 요청할 수 있다.

 

7.1 작업 중단

- 사용자가 취소하기를 요청한 경우

- 시간이 제한된 작업

- 애플리케이션 이벤트

- 오류

- 종료

 

특정 작업을 임의로 종료시킬 수 있는 방법은 없으며 작업 취소 스레드를 사용하여 작업을 멈추는 협력적인 방법을 사용해야 한다.

@ThreadSafe

public class PrimeGenerator implemetns Runnable {

           @GuardedBy("this")

           private final List<BigInteger> primes = new ArrayList<BigInteger>();

           private volitile boolean cancelled;

          

           public void run(){

                     BigInteger p = BigInteger.ONE;

                     while(!cancelled){

                                p = p.nextProbablePrime();

                                synchronized(this){

                                          primes.add(p);

                                }

                     }

           }

          

           public void cancel() { cancelled = true; }

          

           public synchronized List<BigInteger> get(){

                     return new ArrayList<BigInteger>(primes);

           }

}

 

List<BigInteger> aSecondOfPrime() throws InterruptedException {

           PrimeGenerator generator = new PrimeGenerator();

           new Thread(generator).start();

           try{

                     SECONDS.sleep(1);

           }finally{

                     generator.cancel();

           }

           return generator.get();

}


7.1.1
인터럽트

- stop, cancel 플래그를 사용하여 해당 스레드의 중단 여부를 판별하는 경우에는 작업중단 요청시 작업에 걸리는 시간만큼의 대기시간이 필요할 수도 있다.

- 모든 스레드는 불린 값으로 인터럽트 상태를 갖고 있다.

- 위 플래그를 사용하기 보다는 Thread.currentThread.isInterrupted() 메소드를 통해 현재 상태를 확인하는 것이 좋다.

- Thread.sleep 이나 Object.wait 로 대기시 interrupt가 걸리면 InterruptedException 을 발생한다.

 

interrupt : 해당 스레드에 interrupt 를 건다.

isInterrupted : 해당 스레드가 interrupt 상태인지를 확인한다.

Thread.interrupted : 현재 스레드의 인터럽트 상태를 해제하고, 해제하기 이전의 값이 무엇이었는지를 알려준다.

 

class PrimeProducer extends Thread{

           priate final BlockingQueue<BigInteger> queue;

          

           PrimeProducer(BlockingQueue<BigInteger> queue){

                     this.queue = queue;

           }

          

           public void run(){

                     try{

                                BigInteger p = BigInteger.ONE;

                                while(Thread.currentThread.isInterrupted())

                                          queue.put(p = p.nextProbablePrime());

                     }catch(InterruptedException consumed){

                               

                     }

           }

          

           public void cancel(){ interrupt(); }

}



7.1.2
인터럽트 정책

- 인터럽트 처리 정책은 인터럽트 요청이 들어 왔을 때, 해당 스레드가 인터럽트를 어떻게 처리해야 하는지에 대한 지침이다.

- 가장 범용적인 인터럽트 정책은 스레드 수준이나 서비스 수준에서 작업 중단 기능을 제공하는 것이다.
- 작업 task 과 스레드 thread 가 인터럽트 상황에서 서로 어떻게 동작해야 하는지를 명확히 구분할 필요가 있다.
 

7.1.3 인터럽트에 대한 대응

- 발생한 예외를 호출 스택의 상위 메소드로 전달한다.
- 호출 스택의 상단에 위치한 메소드가 직접 처리할 수 있도록 인터럽트 상태를 유지한다. 

7.1.4 예제 : 시간 지정 실행

private static final ScheduledExecutorService cancelExec = ...;

 

public static void timedRun(Runnable r, long timeout, TimeUnit unit){

           final Thread taskThread = Thread.currentThread();

           cancelExec.schedule( new Runnable(){

                     public void run(){ taskThread.interrupt(); }

           }, timeout, unit);

          

           r.run();

}

 

위 코드는 내부에서 scheduledThreadPool 을 사용해서 interrupt 를 거는 스레드를 별도로 띄운다.

지정된 시간이 경과하게 되면 interrupt 를 걸도록 설계되었다.

하지만 지정된 시간 이내에 작업이 끝나고 다른 작업을 실행중일때는 커다란 오류를 범하게 된다.

 

public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InterruptedException {

           class RethrowableTask implements Runnable {

                     private volatile Throwable t;

                     public void run(){

                                try { r.run(); }

                                catch( Throwable t){ this.t = t; }

                     }

                     void rethrow(){

                                if( t!= null)

                                          throw launderThrowable(t);

                     }

           }

          

           RethrowableTask task = new RethrowableTask();

           final Thread taskThread = new Thread(task);

           taskThread.start();

           cancelExec.schedule( new Runnable(){

                     public void run(){ taskThread.interrupted(); }

           }, timeout, unit);

           taskThread.join(unit.toMillies(timeout));

           task.rethrow();

}

 

첫번째 예제와는 다르게 timedRun 을 호출하는 Thread interrupt 를 거는 방식이 아닌 메소드 내부에서 Thread 를 생성하여 그 생성된 Thread

interrupt를 거는 방식

 

7.1.5 Future 를 사용해 작업 중단

Executor 에서는 기본적으로 작업을 실행하기 위해 생성하는 스레드는 인터럽트가 걸렸을 때 작업을 중단할 수 있도록 하는 인터럽트 정책을 사용한다.

 

public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {

           Future<?> task = taskExec.submit(r);

           try{

                     task.get(timeout, unit);

           }catch( TimeoutException e){

          

           }catch( ExecutionException e){

          

           }finally{

                     task.cancel(true);

           }

          

}

 

7.1.6 인터럽트에 응답하지 않는 블로킹 작업 다루기

- java.io 패키지의 동기적 소켓 I/O

           Socket 에서 가져온 InputStream read, OutputStream write interrupt 에 반응하지 않으며 socket close 하게되면 read, write 메소드가 중단되면서 SocketException 이 발생한다.

- java.nio 패키지의 동기적 I/O

- Selector 를 사용한 비동기적 I/O

- 락 확보

 

public class RenderThread extends Thread{

           ...

           public void interrupt(){

                     try{

                                socket.close();

                     }catch(IOException e){}

                     finally{

                                super.interrupt();

                     }

           }

           ...

}


7.1.7 newTaskFor 메소드로 비표준적인 중단 방법 처리

ThreadPoolExecutor 에는 newTaskFor 라는 메소드가 추가됬다.

newTaskFor 로 해당 작업을 나타내는 RunnableFuture 객체를 리턴한다.

 

7.2 스레드 기반 서비스 중단

스레드에 대한 소유권은 스레드풀(스레드 기반 서비스)이 가지고 있다. 따라서 개개의 스레드에 대한 인터럽트 요청은 스레드 풀에서 처리되어야 한다.

 

7.2.1 예제 : 로그 서비스

PrintWriter 와 같은 스트림 기반 클래스는 스레드에 안전하기 때문에 println으로 필요한 내용을 출력하는 기능을 사용할 때 별다른 동기화 기법이 필요하지 않다.

 

7.2.2 ExecutorService 종료

종료하는 두개의 메소드

shutdown : 안전하게 종료하는 방법

shutdownNow : 강제로 종료하는 방법

 

7.2.3 독약

프로듀서-컨슈머 패턴으로 구성된 서비스를 종료시키는 또 다른 방법으로 독약이라고 불리는 방법이 있다.

컨슈머가 독약 객체를 가져갔을때 컨슈머 스레드를 종료하는 방법이다.

이 방법은 컨슈머의 갯수를 정확히 알고 있을 때에만 사용할 수 있다.

 

public class IndexingService {

           private static final File POISON = new File("");

           private final IndexerThread consumer = new IndexerThread();

           private final CrawlerThread producer = new CrawlerThread();

           private final BlockingQueue<File> queue;

           private final FileFilter fileFilter;

           private final File root;

          

           class CrawlerThread extends Thread{

                     public void run(){

                                try{

                                          crawl(root);

                                }

                                catch(InterruptedException e){}

                                finally{

                                          while(true){

                                                     try{

                                                                queue.put(POISON);

                                                                break;

                                                     }

                                                     catch(InterruptedException el){ /* 재시도 */}

                                          }

                                }

                     }

                    

                     private void crawl(File root) throws InterruptedException{}

           }

          

           public class IndexerThread extends Thread{

                     public void run(){

                                try{

                                          while(true){

                                                     File file = queue.take();

                                                     if(file == POISON)

                                                                break;

                                                     else

                                                                indexFile(file);

                                          }

                                }

                                catch( InterruptedException consumed){}

                     }

           }

          

           public void start(){

                     producer.start();

                     consumer.start();

           }

          

           public void stop(){

                     producer.interrupt();

           }

          

           public void awaitTermination() throws InterruptedException{

                     consumer.join();

           }

          

          

}

 

7.2.4 예제 : 단번에 실행하는 서비스

 

boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit) throws InterruptedException{

           ExecutorService exec = Executors.newCachedThreadPool();

           final AtomicBoolean hasNewMail = new AtomicBoolean(false);

           try{

                     for( final String host : hosts){

                                exec.execute( new Runnable(){

                                          public void run(){

                                                     if(checkMail(host))

                                                                hasNewMail.set(true);

                                          }

                                });

                     }

           }finally{

                     exec.shutdown();

                     exec.awaitTermination(timeout, unit);

           }

          

           return hasNewMail.get();

}

위의 예제를 보면 finally 구문에서 exec.shutdown 을 호출하여 스레드들의 작업이 끝나기를 기다리고 있다.

 

7.2.5 shutdownNow 메소드의 약점

shutdownNow 메소드 실행시 현재 실행 중인 모든 스레드의 작업을 중단시키도록 하고 등록됐지만 실행은 되지 않았던 모든 작업의 목록을 리턴해준다.

 

private final Set<URL> urlsToCrawl = new HashSet<URL>();

 

public synchronized void stop() throws InterruptedException {

           try{

                     saveUncrawled(exec.shutdownNow());

                     if( exec.awaitTermination(TIMEOUT, UNIT))

                                saveUncrawled(exec.getCancelledTasks());

           }finally{

                     exec = null;

           }

}

 

private void saveUncrawled(List<Runnable> uncrawled){

           for( Runnable task : uncrawled)

                     urlsToCrawl.add(((CrawlTask)task).getPage());

}

 

7.3 비정상적인 스레드 종료 상황 처리

스레드를 예상치 못하게 종료시키는 가장 큰 원인은 바로 RuntimeException이다.

따라서 RuntimeException으로 스레드가 종료되는 경우에는 외부에 종료된다는 사실을 알려 대응이 가능하도록 하여야 한다.

 

public void run(){

           Throwable thrown = null;

           try{

                     while(!isInterrupted())

                                runTask(getTaskFromWorkQueue());

           }catch(Throwable e){

                     thrown = e;

           }finally{

                     threadExited(this, thrown);

           }

}

 

7.3.1 정의되지 않은 예외 처리

스레드 API UncaughtExceptionHandler 라는 기능을 사용하면 처리하지 못한 예외 상황으로 인해 특정 스레드가 종료되는 시점을 정확히 알 수 있다.

 

public interface UncaughtExceptionHandler{

           void uncaughtException(Thread t, Throwable e);

}

 

public class UEHLogger implements Thread.UncaughtExceptionHandler{

           public void uncaughtException(Thread t, Throwable e){

                     Logger logger = Logger.getAnonymousLogger();

                     logger.log(Level.SEVERE, "Thread terminated with exception: " + t.getName(), e);

           }

}

 

UncaughtExceptionHandler execute 로 실행시에 적용된다. submit 를 통해 실행하면 리턴되는 Future 객체의 get 메소드 호출시 예외가 발생했을 때 해당 예외가

ExecutionException 에 감싸진 상태로 넘어온다.

 

7.4 JVM 종료

JVM 이 종료되는 두가지 경우

- 예정된 절차대로 종료되는 경우

           .일반 스레드가 모두 종료되는 시점

           .System.exit 메소드를 호출하거나 ctrl+c 키를 입력한 경우

- 예기치 못하게 임의로 종료되는 경우

           .Runtime.halt 메소드 호출하는 경우

           .운영체제 수준에서 JVM 프로세스를 강제로 종료하는 경우

          

7.4.1 종료 훅

예정된 절차대로 종료되는 경우에 JVM Runtime.addShutdownHook 메소드를 사용해 등록된 종료 훅을 실행한다.

종료 훅이 모두 실행되고 나면 runFinalizersOnExit 값을 확인해 true 일 경우 모든 클래스의 finalize 메소드를 호출하고 종료한다.

종료 훅은 어떤 서비스나 애플리케이션 자체의 여러 부분을 정리하는 목적으로 사용하기 좋다.

예를 들어 임시로 만들어 사용했던 파일을 삭제하거나, 운영체제에서 알아서 정리해주지 않은 모든 자원을 종료훅에서 정리하는게 좋다.

종료 훅의 실행순서는 보장되지 않는다. 따라서 종속 관계를 가질 경우에는 하나의 종료 훅에서 순차적으로 처리해 주어야 한다.

 

public void start(){

           Runtime.getRuntime().addShutdownHook(new Thread(){

                     public void run(){

                                try{

                                          LogService.this.stop();

                                }catch(InterruptedException ignored){}

                     }

           });

}

 

7.4.2 데몬 스레드

스레드는 두 가지 종류로 나뉜다. 일반스레드, 데몬스레드이다.

데몬스레드는 JVM 내부적으로 사용하기 위해 실행하는 스레드이다.(가비지 컬렉터나 기타 여러가지 스레드)

main 스레드에서 생성한 모든 스레드는 일반 스레드이다.

JVM 은 남아 있는 모든 일반 스레드가 종료되고 난 후 JVM 종료 절차를 진행한다. 모든 데몬스레드는 버려지는 셈이다.

데몬 스레드는 예고 없이 종료될 수 있기 때문에 애플리케이션 내부에서 시작시키고 종료시키며 사용하기에는 그다지 좋은 방법이 아니다.

 

7.4.3 finalize 메소드

finalize 메소드는 사용하지 않는게 좋다.

: