본문 바로가기
Language/Java

[java] RejectExecutionHandler reject policy - 작업 큐(workQueue)가 full인 상태에서의 처리 정책

by 돈코츠라멘 2019. 8. 30.

Thread Pool ?

병렬 작업 처리가 많아지면 Thread의 개수가 증가하고 그에 따른 Thread 생성과 스케줄링으로 인해 CPU가 바빠져서 메모리 사용량이 늘어난다. 따라서 애플리케이션의 성능이 저하된다. 이런 상황을 막기 위해 Thread Pool을 사용한다.

Thread Pool은 작업 처리에 사용되는 Thread를 제한된 개수만큼 정해놓고 작업 큐에 들어오는 작업을 하나씩 Thread가 맡아서 처리한다.

java.util.concurrent 패키지에 ExecutorService 인터페이스와 ExecutorService 구현 객체를 만들 수 있는 Executors 클래스를 제공해준다. ExecutorService 구현 객체는 newCachedThreadPool()newFixedThreadPool(int nThreads)로 생성할 수 있다. 또는 아래와 같은 생성자를 사용해서 ThreadPoolExecutor 객체를 직접 생성해도 된다.

public ThreadPoolExecutor(int corePoolSize, // 최소한 유지해야 할 Thread 수
                          int maximumPoolSize, // 최대 Thread 수
                          long keepAliveTime, // corePoolSize를 초과한 만큼의 Thread는 IDLE 상태인 시간이 keepAliveTime을 넘으면 종료
                          TimeUnit unit, // keepAliveTime 시간 단위
                          BlockingQueue<Runnable> workQueue // 작업 큐
                          ){
                            //...
                          }

workQueue(작업 큐) 종류

  1. SynchronousQueue - Producer에서 생긴 작업을 Consumer인 Thread에 직접 전달한다. 사실상 큐가 아니며 Thread 간에 작업을 넘겨주는 역할만 한다. newCachedThreadPool()로 만들어진 객체의 작업 큐이다.
  2. LinkedBlockingQueue - corePoolSize의 모든 Thread가 Busy 상태인 경우 새로운 테스크는 작업 큐에서 대기한다. 사실상 maximumPoolSize를 넘는 Thread는 생성되지 않기 때문에 이 값은 의미가 없다.
  3. ArrayBlockingQueue

RejectedExecutionHandler

public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

RejectedExecutionHandler는 ThreadPoolExecutor에서 task를 더 이상 받을 수 없을 때 호출된다. 이런 경우는 큐 허용치를 초과하거나 Executor가 종료되어서 Thread 또는 큐 슬롯을 사용할 수 없는 경우에 발생한다.


Reject Policy

Executor는 작업 큐가 꽉 찼을 때 아래 4가지 전략 중에 하나를 선택해서 사용할 수 있다.

  1. ThreadPoolExecutor.AbortPolicy
  2. ThreadPoolExecutor.CallerRunsPolicy
  3. ThreadPoolExecutor.DiscardPolicy
  4. ThreadPoolExecutor.DiscardOldestPolicy

ThreadPoolExecutor.AbortPolicy

Default로 설정되어 있는 정책이다. Reject된 task가 RejectedExecutionException을 던진다.

default
int CORE_POOL_SIZE = 5;
int MAX_POOL_SIZE = 5;
int BLOCKING_QUEUE_SIZE = 1; // 작업 큐 사이즈 작게!

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE));

try {
    for (int i = 0; i < 10; i++) {
        Runnable runnable = () -> {
            ThreadPoolExecutor pool = threadPoolExecutor;
            System.out.println("Thread count: " + pool.getPoolSize() + ", current thread: " + Thread.currentThread().getName());
        };
        threadPoolExecutor.execute(runnable);
    }
} catch (Exception e) {
    e.printStackTrace();
}
  • result
Thread count: 5, current thread: pool-1-thread-1
Thread count: 5, current thread: pool-1-thread-3
Thread count: 5, current thread: pool-1-thread-1
java.util.concurrent.RejectedExecutionException: Task testClass$$Lambda$1/990368553@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 5, active threads = 3, queued tasks = 1, completed tasks = 2]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at testClass.main(testClass.java:27)
Thread count: 5, current thread: pool-1-thread-5
Thread count: 5, current thread: pool-1-thread-2
Thread count: 5, current thread: pool-1-thread-4
AbortPolicy 지정
int CORE_POOL_SIZE = 5;
int MAX_POOL_SIZE = 5;
int BLOCKING_QUEUE_SIZE = 1; // 작업 큐 사이즈 작게!

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE));

threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // Policy 지정

try {
    for (int i = 0; i < 10; i++) {
        Runnable runnable = () -> {
            ThreadPoolExecutor pool = threadPoolExecutor;
            System.out.println("Thread count: " + pool.getPoolSize() + ", current thread: " + Thread.currentThread().getName());
        };
        threadPoolExecutor.execute(runnable);
    }
} catch (Exception e) {
    e.printStackTrace();
}
  • result
Thread count: 2, current thread: pool-1-thread-1
Thread count: 5, current thread: pool-1-thread-4
Thread count: 5, current thread: pool-1-thread-2
Thread count: 5, current thread: pool-1-thread-3
java.util.concurrent.RejectedExecutionException: Task testClass$$Lambda$1/990368553@448139f0 rejected from java.util.concurrent.ThreadPoolExecutor@7cca494b[Running, pool size = 5, active threads = 1, queued tasks = 1, completed tasks = 4]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at testClass.main(testClass.java:27)
Thread count: 5, current thread: pool-1-thread-1
Thread count: 5, current thread: pool-1-thread-5

ThreadPoolExecutor.CallerRunsPolicy

Executes task r in the caller's thread, unless the executor has been shut down, in which case the task is discarded.

호출한 Thread에서 reject된 task를 대신 실행한다.

int CORE_POOL_SIZE = 5;
int MAX_POOL_SIZE = 5;
int BLOCKING_QUEUE_SIZE = 1; // 작업 큐 사이즈 작게!

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE));

threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // Policy 지정

try {
    for (int i = 0; i < 10; i++) {
        Runnable runnable = () -> {
            ThreadPoolExecutor pool = threadPoolExecutor;
            System.out.println("Thread count: " + pool.getPoolSize() + ", current thread: " + Thread.currentThread().getName());
        };
        threadPoolExecutor.execute(runnable);
    }
} catch (Exception e) {
    e.printStackTrace();
}
  • result
Thread count: 3, current thread: pool-1-thread-1
Thread count: 3, current thread: pool-1-thread-2
Thread count: 5, current thread: main
Thread count: 5, current thread: main
Thread count: 5, current thread: main
Thread count: 5, current thread: main
Thread count: 5, current thread: pool-1-thread-1
Thread count: 5, current thread: pool-1-thread-4
Thread count: 5, current thread: pool-1-thread-5
Thread count: 5, current thread: pool-1-thread-3

ThreadPoolExecutor.DiscardPolicy

A handler for rejected tasks that silently discards the rejected task.

Reject된 task는 버려진다. Exception도 발생하지 않는다.

int CORE_POOL_SIZE = 5;
int MAX_POOL_SIZE = 5;
int BLOCKING_QUEUE_SIZE = 1; // 작업 큐 사이즈 작게!

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE));

threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // Policy 지정

try {
    for (int i = 0; i < 10; i++) {
        Runnable runnable = () -> {
            ThreadPoolExecutor pool = threadPoolExecutor;
            System.out.println("Thread count: " + pool.getPoolSize() + ", current thread: " + Thread.currentThread().getName());
        };
        threadPoolExecutor.execute(runnable);
    }
} catch (Exception e) {
    e.printStackTrace();
}
  • result
Thread count: 5, current thread: pool-1-thread-1
Thread count: 5, current thread: pool-1-thread-3
Thread count: 5, current thread: pool-1-thread-1
Thread count: 5, current thread: pool-1-thread-2
Thread count: 5, current thread: pool-1-thread-5
Thread count: 5, current thread: pool-1-thread-4

10개의 Thread가 모두 실행되지 못하고 일부는 버려진다.


ThreadPoolExecutor.DiscardOldestPolicy

A handler for rejected tasks that discards the oldest unhandled request and then retries execute(), unless the executor is shut down, in which case the task is discarded.

실행자를 종료하지 않는 한 가장 오래된 처리되지 않은 요청을 삭제하고 execute()를 다시 시도한다.

int CORE_POOL_SIZE = 5;
int MAX_POOL_SIZE = 5;
int BLOCKING_QUEUE_SIZE = 1; // 작업 큐 사이즈 작게!

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(BLOCKING_QUEUE_SIZE));

threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // Policy 지정

try {
    for (int i = 0; i < 10; i++) {
        Runnable runnable = () -> {
            ThreadPoolExecutor pool = threadPoolExecutor;
            System.out.println("Thread count: " + pool.getPoolSize() + ", current thread: " + Thread.currentThread().getName());
        };
        threadPoolExecutor.execute(runnable);
    }
} catch (Exception e) {
    e.printStackTrace();
}
  • result
Thread count: 3, current thread: pool-1-thread-1
Thread count: 4, current thread: pool-1-thread-3
Thread count: 5, current thread: pool-1-thread-4
Thread count: 5, current thread: pool-1-thread-5
Thread count: 5, current thread: pool-1-thread-4
Thread count: 5, current thread: pool-1-thread-2
Thread count: 5, current thread: pool-1-thread-5

DiscardPolicy와 마찬가지로 일부 요청은 유실된다.

댓글