염몽 개발일지
article thumbnail

1. 생산자 소비자 문제

<code />
package thread.bounded; import util.MyLogger; import java.util.ArrayList; import java.util.List; import static util.MyLogger.log; import static util.ThreadUtils.sleep; public class BoundedMain { public static void main(String[] args) { // 1. BoundedQueue 선택 BoundedQueue queue = new BoundedQueueV4(2); // 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택! //producerFirst(queue); // 생산자 먼저 실행 consumerFirst(queue); // 소비자 먼저 실행 } private static void consumerFirst(BoundedQueue queue) { log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " =="); List<Thread> threads = new ArrayList<>(); startConsumer(queue, threads); printAllState(queue, threads); startProducer(queue, threads); printAllState(queue, threads); log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " =="); } private static void producerFirst(BoundedQueue queue) { log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " =="); List<Thread> threads = new ArrayList<>(); startProducer(queue, threads); printAllState(queue, threads); startConsumer(queue, threads); printAllState(queue, threads); log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " =="); } private static void startConsumer(BoundedQueue queue, List<Thread> threads) { System.out.println(); log("소비자 시작"); for (int i = 1; i <= 3; i++) { Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i); threads.add(consumer); consumer.start(); sleep(100); } } private static void startProducer(BoundedQueue queue, List<Thread> threads) { System.out.println(); log("생산자 시작"); for (int i = 1; i <= 3; i++) { Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i); threads.add(producer); producer.start(); sleep(100); } } private static void printAllState(BoundedQueue queue, List<Thread> threads){ System.out.println(); log("현재 상태 출력, 큐 데이터: " + queue); for (Thread thread : threads) { log(thread.getName() + ": " + thread.getState()); } } }
<code />
package thread.bounded; import java.util.ArrayDeque; import java.util.Queue; import static util.MyLogger.log; import static util.ThreadUtils.sleep; public class BoundedQueueV3 implements BoundedQueue { private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV3(int max) { this.max = max; } @Override public synchronized void put(String data) { while (queue.size() == max) { log("[put] 큐가 가득 참, 생산자 대기)"); try { wait(); log("[put] 생산자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } queue.offer(data); log("[put] 생산자 데이터 저장, notify() 호출"); notify(); // 대기 스레드, WAIT -> BLOCKED } @Override public synchronized String take() { while (queue.isEmpty()) { log("[take] 큐에 데이터가 없음, 소비자 대기"); try { wait(); // RUNNABLE -> WAITING, 락 반납 log("[take] 소비자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } String data = queue.poll(); log("[take] 소비자 데이터 획득, notify() 호출"); notify(); // 대기 스레드, WAIT -> BLOCKED return data; } @Override public String toString() { return queue.toString(); } }

위의 코드는 생성자-소비자 문제를 해결할 수 있는 코드이다.
Object 클래스에서 제공하는 wait()notifiy() 를 사용해서 해결한 코드이지만, 단점이 존재한다.
한가지의 대기큐인 모니터 대기 큐를 사용하기 때문에, 스레드 기아 현상이 발생할 수 있다. 생성자 스레드는 소비자 스레드를 깨워야하는데, 같은 스레드를 깨우는 경우가 발생한다.

그렇다면 ReentrantLockCondition을 사용하며 생성자 큐와 소비자 큐로 각자 분리하면 된다.

참고: 모든 객체 내부에 모니터락과 모니터 대기 큐가 존재한다. 위의 코드의 스레드가 wait()WAITING 상태가 되면 모니터 대기 큐로 들어가는 것이다. 즉, Obejctwait(), notifiy(), notifiyAll()synchronized와 연관되어있으며, 모니터 락과 모니터 대기 큐를 사용한다.

출처: 김영한의 실전 자바 (인프런)

Condition condtion = lock.newCondition() : ReentrantLock을 사용하면 condition이 스레드 대기 공간이다.

condition.await(): ReentrantLock에서 획득한 락을 반납하고 대기 상태로 condition에 보관된다. synchronized에서 사용하는 객체 내부에 있는 모니터 락이 아니라, ReentrantLock락을 뜻한다.

<code />
import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static util.MyLogger.log; public class BoundedQueueV5 implements BoundedQueue { private final Lock lock = new ReentrantLock(); private final Condition producerCond = lock.newCondition(); //생성자 대기 공간 생성 private final Condition consumerCond = lock.newCondition(); //소비자 대기 공간 생성 private final Queue<String> queue = new ArrayDeque<>(); private final int max; public BoundedQueueV5(int max) { this.max = max; } @Override public void put(String data) { lock.lock(); try { while (queue.size() == max) { log("[put] 큐가 가득 참, 생산자 대기"); try { producerCond.await(); log("[put] 생산자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } queue.offer(data); log("[put] 생산자 데이터 저장, consumerCond.signal() 호출"); consumerCond.signal(); } finally { lock.unlock(); } } @Override public String take() { lock.lock(); try { while (queue.isEmpty()) { log("[take] 큐에 데이터가 없음, 소비자 대기"); try { consumerCond.await(); log("[take] 소비자 깨어남"); } catch (InterruptedException e) { throw new RuntimeException(e); } } String data = queue.poll(); log("[take] 소비자 데이터 획득, producerCond.signal() 호출"); producerCond.signal(); return data; } finally { lock.unlock(); } } @Override public String toString() { return queue.toString(); } }

출처: 김영한의 실전 자바 (인프런)

2. 스레드의 대기

synchronized

  1. BLOCKED 상태
    • 모니터 락이 획득하지 못하고 기다릴 때, 자바 내부에 구현되어있는 락 대기 집합에서 대기
  2. WAITING 상태
    • wait()와 같은 메소드 호출로 스레드 대기 집합에 있을 때

출처: 김영한의 실전 자바 (인프런)

Lock(ReentrantLock)

  1. 락 대기 : ReentrantLock의 대기 큐에서 WAITING 상태로 락 획득 대기
    • 다른 스레드가 lock.unlock()을 호출 했을 때 대기가 풀리며 락 획득 시도, 락 획득 하면 대기 큐 빠져나가고 RUNNABLE 상태
  2. 대기 큐 대기 : condition 대기 큐에서 WAITING 상태로 대기

3. BlockingQueue

<code />
package thread.bounded; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BoundedQueueV6_1 implements BoundedQueue { private BlockingQueue<String> queue; public BoundedQueueV6_1(int max) { this.queue = new ArrayBlockingQueue<>(max); } @Override public void put(String data) { try { queue.put(data); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public String take() { try { return queue.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public String toString() { return queue.toString(); } }

BlockingQueue 는 생산자 소비자 문제를 완벽하게 해결한 여러가지 기능을 제공한다.
우리가 만든 코드는 이미 BlockingQueue에 모두 있다.

출처: 김영한의 실전 자바 (인프런)
출처: 김영한의 실전 자바 (인프런)

'Study > Java' 카테고리의 다른 글

Java. 동시성 컬렉션  (0) 2024.08.31
Java. 원자적 연산, CAS 연산  (0) 2024.08.30
Java. 고급 동기화 - concurrent.Lock  (3) 2024.08.28
Java. 동기화 - synchronized  (0) 2024.08.25
Java. 메모리 가시성  (0) 2024.08.24
profile

염몽 개발일지

@염몽이

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!