염몽 개발일지
article thumbnail

생산자 소비자 문제

package thread.bounded;

import util.MyLogger;

import java.util.ArrayList;
import java.util.List;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class BoundedMain {
    public static void main(String[] args) {
        // 1. BoundedQueue 선택
        BoundedQueue queue = new BoundedQueueV4(2);

        // 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
        //producerFirst(queue); // 생산자 먼저 실행
        consumerFirst(queue); // 소비자 먼저 실행
    }

    private static void consumerFirst(BoundedQueue queue) {
        log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startConsumer(queue, threads);
        printAllState(queue, threads);
        startProducer(queue, threads);
        printAllState(queue, threads);
        log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }

    private static void producerFirst(BoundedQueue queue) {
        log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
        List<Thread> threads = new ArrayList<>();
        startProducer(queue, threads);
        printAllState(queue, threads);
        startConsumer(queue, threads);
        printAllState(queue, threads);
        log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
    }

    private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("소비자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
            threads.add(consumer);
            consumer.start();
            sleep(100);
        }
    }

    private static void startProducer(BoundedQueue queue, List<Thread> threads) {
        System.out.println();
        log("생산자 시작");
        for (int i = 1; i <= 3; i++) {
            Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
            threads.add(producer);
            producer.start();
            sleep(100);
        }
    }

    private static void printAllState(BoundedQueue queue, List<Thread> threads){
        System.out.println();
        log("현재 상태 출력, 큐 데이터: " + queue);
        for (Thread thread : threads) {
            log(thread.getName() + ": " + thread.getState());
        }
    }
}
package thread.bounded;

import java.util.ArrayDeque;
import java.util.Queue;

import static util.MyLogger.log;
import static util.ThreadUtils.sleep;

public class BoundedQueueV3 implements BoundedQueue {

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV3(int max) {
        this.max = max;
    }

    @Override
    public synchronized void put(String data) {

        while (queue.size() == max) {
            log("[put] 큐가 가득 참, 생산자 대기)");
            try {
                wait();
                log("[put] 생산자 깨어남");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        queue.offer(data);
        log("[put] 생산자 데이터 저장, notify() 호출");
        notify(); // 대기 스레드, WAIT -> BLOCKED
    }

    @Override
    public synchronized String take() {
        while (queue.isEmpty()) {
            log("[take] 큐에 데이터가 없음, 소비자 대기");
            try {
                wait(); // RUNNABLE -> WAITING, 락 반납
                log("[take] 소비자 깨어남");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        String data = queue.poll();
        log("[take] 소비자 데이터 획득, notify() 호출");
        notify(); // 대기 스레드, WAIT -> BLOCKED
        return data;
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

위의 코드는 생성자-소비자 문제를 해결할 수 있는 코드이다.
Object 클래스에서 제공하는 wait()notifiy() 를 사용해서 해결한 코드이지만, 단점이 존재한다.
한가지의 대기큐인 모니터 대기 큐를 사용하기 때문에, 스레드 기아 현상이 발생할 수 있다. 생성자 스레드는 소비자 스레드를 깨워야하는데, 같은 스레드를 깨우는 경우가 발생한다.

그렇다면 ReentrantLockCondition을 사용하며 생성자 큐와 소비자 큐로 각자 분리하면 된다.

참고: 모든 객체 내부에 모니터락과 모니터 대기 큐가 존재한다. 위의 코드의 스레드가 wait()WAITING 상태가 되면 모니터 대기 큐로 들어가는 것이다. 즉, Obejctwait(), notifiy(), notifiyAll()synchronized와 연관되어있으며, 모니터 락과 모니터 대기 큐를 사용한다.

출처: 김영한의 실전 자바 (인프런)

Condition condtion = lock.newCondition() : ReentrantLock을 사용하면 condition이 스레드 대기 공간이다.

condition.await(): ReentrantLock에서 획득한 락을 반납하고 대기 상태로 condition에 보관된다. synchronized에서 사용하는 객체 내부에 있는 모니터 락이 아니라, ReentrantLock락을 뜻한다.

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static util.MyLogger.log;

public class BoundedQueueV5 implements BoundedQueue {

    private final Lock lock = new ReentrantLock();
    private final Condition producerCond = lock.newCondition(); //생성자 대기 공간 생성
    private final Condition consumerCond = lock.newCondition(); //소비자 대기 공간 생성

    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;

    public BoundedQueueV5(int max) {
        this.max = max;
    }

    @Override
    public void put(String data) {
        lock.lock();
        try {
            while (queue.size() == max) {
                log("[put] 큐가 가득 참, 생산자 대기");
                try {
                    producerCond.await();
                    log("[put] 생산자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            log("[put] 생산자 데이터 저장, consumerCond.signal() 호출");
            consumerCond.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String take() {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                log("[take] 큐에 데이터가 없음, 소비자 대기");
                try {
                    consumerCond.await();
                    log("[take] 소비자 깨어남");
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }

            String data = queue.poll();
            log("[take] 소비자 데이터 획득, producerCond.signal() 호출");
            producerCond.signal();
            return data;
        } finally {
            lock.unlock();
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

출처: 김영한의 실전 자바 (인프런)

스레드의 대기

synchronized

  1. BLOCKED 상태
    • 모니터 락이 획득하지 못하고 기다릴 때, 자바 내부에 구현되어있는 락 대기 집합에서 대기
  2. WAITING 상태
    • wait()와 같은 메소드 호출로 스레드 대기 집합에 있을 때

출처: 김영한의 실전 자바 (인프런)

Lock(ReentrantLock)

  1. 락 대기 : ReentrantLock의 대기 큐에서 WAITING 상태로 락 획득 대기
    • 다른 스레드가 lock.unlock()을 호출 했을 때 대기가 풀리며 락 획득 시도, 락 획득 하면 대기 큐 빠져나가고 RUNNABLE 상태
  2. 대기 큐 대기 : condition 대기 큐에서 WAITING 상태로 대기

BlockingQueue

package thread.bounded;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class BoundedQueueV6_1 implements BoundedQueue {

    private BlockingQueue<String> queue;

    public BoundedQueueV6_1(int max) {
        this.queue = new ArrayBlockingQueue<>(max);
    }

    @Override
    public void put(String data) {
        try {
            queue.put(data);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String take() {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public String toString() {
        return queue.toString();
    }
}

BlockingQueue 는 생산자 소비자 문제를 완벽하게 해결한 여러가지 기능을 제공한다.
우리가 만든 코드는 이미 BlockingQueue에 모두 있다.

출처: 김영한의 실전 자바 (인프런)
출처: 김영한의 실전 자바 (인프런)

'Study > Java' 카테고리의 다른 글

Java. 동시성 컬렉션  (0) 2024.08.31
Java. 원자적 연산, CAS 연산  (0) 2024.08.30
Java. 고급 동기화 - concurrent.Lock  (3) 2024.08.28
Java. 동기화 - synchronized  (0) 2024.08.25
Java. 메모리 가시성  (0) 2024.08.24
profile

염몽 개발일지

@염몽이

포스팅이 좋았다면 "좋아요❤️" 또는 "구독👍🏻" 해주세요!