Skip to main content
  1. Guides 리스트/
  2. Go Concurrency 마스터하기/

Go Concurrency 마스터하기: 파이프라인(Pipeline) 패턴과 에러 처리

·1254 words·6 mins·
Go Concurrency 완벽 가이드 - This article is part of a series.
Part 3: This Article

지금까지 우리는 고루틴과 채널이라는 강력한 재료들을 손질하는 법을 배웠습니다.

이제 이 재료들을 조립해서 동시성 파이프라인(Concurrent Pipeline)이라는 근사한 요리를 만들어볼 차례입니다.

하지만 그전에, 동시성 요리를 망치는 가장 흔한 실수, ‘고루틴 누수’부터 잡고 가시죠.

고루틴 누수 (Leaked goroutine)
#

여기 숫자를 특정 범위만큼 채널로 보내는 함수가 있습니다.

func rangeGen(start, stop int) <-chan int {
    out := make(chan int)
    go func() {
        for i := start; i < stop; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

언뜻 보면 문제없어 보입니다.

func main() {
    generated := rangeGen(41, 46)
    for val := range generated {
        fmt.Println(val)
    }
}

하지만 만약 루프를 중간에 빠져나온다면 어떻게 될까요?

func main() {
    generated := rangeGen(41, 46)
    for val := range generated {
        fmt.Println(val)
        if val == 42 {
            break // 42까지만 출력하고 탈출!
        }
    }
}

main 함수는 루프를 빠져나오고 채널 읽기를 중단합니다.

하지만 rangeGen 안의 고루틴은 아직 43을 보내려고 안간힘을 쓰고 있습니다.

// rangeGen 내부
for i := start; i < stop; i++ {    // (1)
    out <- i                       // (2) 블로킹!
}

out 채널을 읽어주는 사람이 없으니 이 고루틴은 ➋ 지점에서 영원히 멈춰버립니다(Blocked).

고루틴이 종료되지 않고 메모리에 계속 남아있는 현상, 이것이 바로 고루틴 누수입니다.

고루틴이 가볍다고는 하지만, 이런 ‘좀비 고루틴’이 수천 개씩 쌓이면 결국 메모리 부족 사태가 벌어집니다.

이 고루틴을 강제로 종료시킬 방법이 필요합니다.

취소 채널 (Cancel channel)
#

main 함수에서 rangeGen에게 “이제 그만하고 퇴근해"라고 신호를 보내면 어떨까요?

별도의 cancel 채널을 만들어서 말이죠.

func main() {
    cancel := make(chan struct{})    // (1)
    defer close(cancel)              // (2)

    generated := rangeGen(cancel, 41, 46)    // (3)
    // ... (생략) ...
}

cancel 채널을 만들고 ➊, 함수 종료 시 닫히도록 defer를 걸어둡니다 ➋.

그리고 이 채널을 고루틴에게 전달합니다 ➌.

이제 rangeGen은 두 가지 일을 해야 합니다.

  1. out 채널로 숫자를 보낸다.

  2. cancel 채널이 닫히면 즉시 종료한다.

이걸 동시에 처리하려면 select 문이 필요합니다.

func rangeGen(cancel <-chan struct{}, start, stop int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for i := start; i < stop; i++ {
            select {
            case out <- i:    // (1)
            case <-cancel:    // (2)
                return
            }
        }
    }()
    return out
}

select는 준비된 케이스 중 하나를 실행합니다.

main이 데이터를 읽고 있다면 ➊이 실행되어 값을 보냅니다.

만약 main이 루프를 탈출해서 defer close(cancel)이 실행되면, cancel 채널이 닫히면서 ➋ 케이스가 선택되어 고루틴이 종료됩니다.

이제 main이 언제 멈추든 상관없이 고루틴 누수는 발생하지 않습니다.

취소(Cancel) vs 완료(Done)
#

잠깐 용어 정리를 하자면, cancel 채널은 지난 장에서 배운 done 채널과 매우 비슷합니다.

보통 실무에서는 둘 다 done이나 stop 같은 이름으로 혼용해서 쓰기도 합니다.

이 책에서는 헷갈림을 방지하기 위해 다음과 같이 구분하겠습니다.

  • Cancel: “작업을 중단해!” (외부 -> 고루틴)
  • Done: “나 다 끝났어!” (고루틴 -> 외부)

채널 병합 (Merging)
#

여러 채널에서 오는 데이터를 하나의 채널로 합치고 싶을 때가 있습니다.

이걸 Fan-in 패턴이라고도 하는데요.

가장 단순한 방법은 순차적으로 읽는 겁니다.

func merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for val := range in1 { out <- val } // in1 다 읽고
        for val := range in2 { out <- val } // 그다음 in2 읽기
    }()
    return out
}

하지만 이건 동시성이 없죠. in1을 읽는 동안 in2는 기다려야 합니다.

병합: 동시성 활용 (Concurrently)
#

각 채널을 읽는 고루틴을 따로 띄우면 해결됩니다.

func merge(in1, in2 <-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // in1 읽는 고루틴
    wg.Go(func() {
        for val := range in1 { out <- val }
    })

    // in2 읽는 고루틴
    wg.Go(func() {
        for val := range in2 { out <- val }
    })

    // 둘 다 끝나면 out 채널 닫기
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

이제 두 채널의 데이터가 동시에 out으로 쏟아집니다. 아주 빠르죠.

병합: Select 활용
#

고루틴을 여러 개 띄우지 않고 select로 해결할 수도 있습니다.

단, 채널이 닫힌 경우를 잘 처리해야 합니다.

닫힌 채널을 nil로 만들어버리면 select에서 해당 케이스가 무시된다는 점(지난 시간에 배운 꿀팁!)을 활용해 봅시다.

func merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for in1 != nil || in2 != nil { // 둘 중 하나라도 살아있으면 반복
            select {
            case val1, ok := <-in1:
                if ok {
                    out <- val1
                } else {
                    in1 = nil // 닫혔으면 nil로 변경 -> select에서 제외됨
                }
            case val2, ok := <-in2:
                // ... (위와 동일 로직) ...
            }
        }
    }()
    return out
}

파이프라인 (Pipeline)
#

드디어 파이프라인입니다!

파이프라인은 데이터를 입력받아 가공하고 출력하는 단계(Stage)들의 연결입니다.

각 단계는 고루틴으로 돌아가고, 단계 사이는 채널로 연결됩니다.

Reader(생성) -> Processor(가공) -> Writer(출력)

예를 들어 [숫자 생성] -> [행운의 숫자 필터링] -> [채널 병합] -> [합계 계산] -> [결과 출력] 이라는 5단계 파이프라인을 구축한다고 해봅시다.

┌─────────────┐
│   rangeGen  │
└─────────────┘
  readerChan─┬────────┬──────────────┬──────────────┐
┌─────────────┐┌─────────────┐┌─────────────┐┌─────────────┐
│  takeLucky  ││  takeLucky  ││  takeLucky  ││  takeLucky  │
└─────────────┘└─────────────┘└─────────────┘└─────────────┘
       │               │              │              │
 luckyChans[0]   luckyChans[1]  luckyChans[2]  luckyChans[3]
       │               │              │              │
┌──────────────────────────────────────────────────────────┐
│                           merge                          │
└──────────────────────────────────────────────────────────┘
   mergedChan
┌─────────────┐
│     sum     │
└─────────────┘
   totalChan
┌─────────────┐
│ printTotal  │
└─────────────┘

코드로 보면 각 단계가 하나의 함수가 되고, 채널을 주고받습니다.

func main() {
    // 1. Reader: 1~1000 숫자 생성
    readerChan := rangeGen(1, 1000)

    // 2. Processor: 행운의 숫자 필터링 (4개 고루틴으로 병렬 처리)
    luckyChans := make([]<-chan int, 4)
    for i := range 4 {
        luckyChans[i] = takeLucky(readerChan)
    }

    // 3. Processor: 병렬 처리된 채널 하나로 병합
    mergedChan := merge(luckyChans)

    // 4. Processor: 합계 계산
    totalChan := sum(mergedChan)

    // 5. Writer: 결과 출력
    printTotal(totalChan)
}

이렇게 파이프라인을 구축하면:

  • 각 단계의 로직이 분리되어 코드가 깔끔해집니다.
  • 병목이 생기는 단계(takeLucky)만 고루틴 개수를 늘려 성능을 튜닝하기 쉽습니다.

고루틴 누수 방지 (심화편)
#

파이프라인에서 고루틴이 누수되는 또 다른 흔한 원인은 바로 select 내부에서의 블로킹입니다.

func modify(cancel <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for {
            select {
            case num := <-in:
                time.Sleep(10 * time.Millisecond) // (1) 작업 중...
                out <- num * 2                    // (2) 여기서 막히면?
            case <-cancel:
                return
            }
        }
    }()
    return out
}

만약 ➊에서 작업하다가 ➋로 넘어가려는 순간에 외부에서 cancel 채널이 닫히고 데이터를 받는 쪽도 사라졌다면?

고루틴은 out <- num * 2를 하려고 ➋에서 영원히 기다리게 됩니다.

이미 selectcase 안으로 들어왔기 때문에 <-cancel은 체크되지 않죠.

이걸 막으려면 값을 보낼 때도 select를 써야 합니다 (중첩 select).

            case num := <-in:
                // ... 작업 수행 ...
                select {
                case out <- result: // 보낼 수 있으면 보내고
                case <-cancel:      // 취소되면 바로 종료!
                    return
                }

원칙: 채널을 읽거나 쓸 때는 언제나 취소(cancel) 가능성을 열어둬야 합니다.

에러 처리 (Error handling)
#

파이프라인 중간에 에러가 발생하면 어떻게 해야 할까요?

예를 들어 calculate 단계에서 외부 API를 호출하다가 실패했다면요?

1. 첫 에러 발생 시 즉시 중단
#

별도의 에러 채널(errc)을 만들어서 에러가 발생하면 즉시 보내고 종료하는 방식입니다.

func calculate(in <-chan int) (<-chan Answer, <-chan error) {
    // ...
    if err != nil {
        errc <- err // 에러 발생!
        return      // 즉시 종료
    }
    // ...
}

하지만 에러 하나 때문에 전체 파이프라인을 멈추기 싫다면요?

2. 결과와 에러를 함께 전달 (Composite result)
#

결과 구조체에 에러 필드를 포함시키는 방법입니다. 가장 널리 쓰이는 방식이죠.

type Result[T any] struct {
    val T
    err error
}

func calculate(in <-chan int) <-chan Result[Answer] {
    // ...
    ans, err := fetchAnswer(n)
    out <- Result{ans, err} // 에러도 값처럼 전달
    // ...
}

다음 단계에서 res.Err()를 확인해서 성공/실패를 분기 처리하면 됩니다.

3. 에러만 따로 수집
#

에러 채널을 따로 두고, 에러가 발생하면 그쪽으로 “로그"처럼 던져버리고 계속 진행하는 방식입니다.

단, 이 에러 채널이 꽉 차서 메인 로직이 블로킹되지 않도록 버퍼를 넉넉히 주거나 select-default 패턴을 써서 에러를 버리기도(drop) 해야 합니다.


이제 여러분은 파이프라인을 조립하고, 누수를 막고, 에러를 우아하게 처리하는 법까지 익혔습니다.

이 정도면 실전에서 꽤 복잡한 동시성 요구사항도 거뜬히 처리할 수 있습니다.

다음 장에서는 동시성 프로그래밍에서 빠질 수 없는 주제, 시간(Time)을 다루는 법에 대해 알아보겠습니다.

Go Concurrency 완벽 가이드 - This article is part of a series.
Part 3: This Article