생산자 소비자 문제
package thread.bounded;
import util.MyLogger;
import java.util.ArrayList;
import java.util.List;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class BoundedMain {
public static void main(String[] args) {
// 1. BoundedQueue 선택
BoundedQueue queue = new BoundedQueueV4(2);
// 2. 생산자, 소비자 실행 순서 선택, 반드시 하나만 선택!
//producerFirst(queue); // 생산자 먼저 실행
consumerFirst(queue); // 소비자 먼저 실행
}
private static void consumerFirst(BoundedQueue queue) {
log("== [소비자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
List<Thread> threads = new ArrayList<>();
startConsumer(queue, threads);
printAllState(queue, threads);
startProducer(queue, threads);
printAllState(queue, threads);
log("== [소비자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
}
private static void producerFirst(BoundedQueue queue) {
log("== [생산자 먼저 실행] 시작, " + queue.getClass().getSimpleName() + " ==");
List<Thread> threads = new ArrayList<>();
startProducer(queue, threads);
printAllState(queue, threads);
startConsumer(queue, threads);
printAllState(queue, threads);
log("== [생산자 먼저 실행] 종료, " + queue.getClass().getSimpleName() + " ==");
}
private static void startConsumer(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("소비자 시작");
for (int i = 1; i <= 3; i++) {
Thread consumer = new Thread(new ConsumerTask(queue), "consumer" + i);
threads.add(consumer);
consumer.start();
sleep(100);
}
}
private static void startProducer(BoundedQueue queue, List<Thread> threads) {
System.out.println();
log("생산자 시작");
for (int i = 1; i <= 3; i++) {
Thread producer = new Thread(new ProducerTask(queue, "data" + i), "producer" + i);
threads.add(producer);
producer.start();
sleep(100);
}
}
private static void printAllState(BoundedQueue queue, List<Thread> threads){
System.out.println();
log("현재 상태 출력, 큐 데이터: " + queue);
for (Thread thread : threads) {
log(thread.getName() + ": " + thread.getState());
}
}
}
package thread.bounded;
import java.util.ArrayDeque;
import java.util.Queue;
import static util.MyLogger.log;
import static util.ThreadUtils.sleep;
public class BoundedQueueV3 implements BoundedQueue {
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV3(int max) {
this.max = max;
}
@Override
public synchronized void put(String data) {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기)");
try {
wait();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, notify() 호출");
notify(); // 대기 스레드, WAIT -> BLOCKED
}
@Override
public synchronized String take() {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
wait(); // RUNNABLE -> WAITING, 락 반납
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, notify() 호출");
notify(); // 대기 스레드, WAIT -> BLOCKED
return data;
}
@Override
public String toString() {
return queue.toString();
}
}
위의 코드는 생성자-소비자 문제를 해결할 수 있는 코드이다.Object
클래스에서 제공하는 wait()
와 notifiy()
를 사용해서 해결한 코드이지만, 단점이 존재한다.
한가지의 대기큐인 모니터 대기 큐를 사용하기 때문에, 스레드 기아 현상이 발생할 수 있다. 생성자 스레드는 소비자 스레드를 깨워야하는데, 같은 스레드를 깨우는 경우가 발생한다.
그렇다면 ReentrantLock
과 Condition
을 사용하며 생성자 큐와 소비자 큐로 각자 분리하면 된다.
참고: 모든 객체 내부에 모니터락과 모니터 대기 큐가 존재한다. 위의 코드의 스레드가 wait()
로 WAITING
상태가 되면 모니터 대기 큐로 들어가는 것이다. 즉, Obejct
의 wait()
, notifiy()
, notifiyAll()
은 synchronized
와 연관되어있으며, 모니터 락과 모니터 대기 큐를 사용한다.
Condition condtion = lock.newCondition()
: ReentrantLock
을 사용하면 condition
이 스레드 대기 공간이다.
condition.await()
: ReentrantLock
에서 획득한 락을 반납하고 대기 상태로 condition
에 보관된다. synchronized
에서 사용하는 객체 내부에 있는 모니터 락이 아니라, ReentrantLock
락을 뜻한다.
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static util.MyLogger.log;
public class BoundedQueueV5 implements BoundedQueue {
private final Lock lock = new ReentrantLock();
private final Condition producerCond = lock.newCondition(); //생성자 대기 공간 생성
private final Condition consumerCond = lock.newCondition(); //소비자 대기 공간 생성
private final Queue<String> queue = new ArrayDeque<>();
private final int max;
public BoundedQueueV5(int max) {
this.max = max;
}
@Override
public void put(String data) {
lock.lock();
try {
while (queue.size() == max) {
log("[put] 큐가 가득 참, 생산자 대기");
try {
producerCond.await();
log("[put] 생산자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
queue.offer(data);
log("[put] 생산자 데이터 저장, consumerCond.signal() 호출");
consumerCond.signal();
} finally {
lock.unlock();
}
}
@Override
public String take() {
lock.lock();
try {
while (queue.isEmpty()) {
log("[take] 큐에 데이터가 없음, 소비자 대기");
try {
consumerCond.await();
log("[take] 소비자 깨어남");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
String data = queue.poll();
log("[take] 소비자 데이터 획득, producerCond.signal() 호출");
producerCond.signal();
return data;
} finally {
lock.unlock();
}
}
@Override
public String toString() {
return queue.toString();
}
}
스레드의 대기
synchronized
BLOCKED
상태- 모니터 락이 획득하지 못하고 기다릴 때, 자바 내부에 구현되어있는 락 대기 집합에서 대기
WAITING
상태wait()
와 같은 메소드 호출로 스레드 대기 집합에 있을 때
Lock(ReentrantLock)
- 락 대기 :
ReentrantLock
의 대기 큐에서WAITING
상태로 락 획득 대기- 다른 스레드가
lock.unlock()
을 호출 했을 때 대기가 풀리며 락 획득 시도, 락 획득 하면 대기 큐 빠져나가고RUNNABLE
상태
- 다른 스레드가
- 대기 큐 대기 :
condition
대기 큐에서WAITING
상태로 대기
BlockingQueue
package thread.bounded;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BoundedQueueV6_1 implements BoundedQueue {
private BlockingQueue<String> queue;
public BoundedQueueV6_1(int max) {
this.queue = new ArrayBlockingQueue<>(max);
}
@Override
public void put(String data) {
try {
queue.put(data);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String take() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public String toString() {
return queue.toString();
}
}
BlockingQueue
는 생산자 소비자 문제를 완벽하게 해결한 여러가지 기능을 제공한다.
우리가 만든 코드는 이미 BlockingQueue
에 모두 있다.
'Study > Java' 카테고리의 다른 글
Java. 동시성 컬렉션 (0) | 2024.08.31 |
---|---|
Java. 원자적 연산, CAS 연산 (0) | 2024.08.30 |
Java. 고급 동기화 - concurrent.Lock (3) | 2024.08.28 |
Java. 동기화 - synchronized (0) | 2024.08.25 |
Java. 메모리 가시성 (0) | 2024.08.24 |