[Go 동시성 프로그래밍] 읽기 - 4장 Go의 동시성 패턴

smpl published on
13 min, 2594 words

Go 동시성 프로그래밍 책의 4장 Go의 동시성 패턴 챕터를 읽고, 읽은 내용을 정리해봅니다.

제한

동시성 코드의 안전한 작동을 위한 몇가지 옵션들

  • 메모리 공유를 위한 동기화 기본 요소(뮤텍스 등)
  • 통신을 위한 동기화(채널 등)
  • 변경 불가능한 데이터
  • 제한(confinement)에 의해 보호되는 데이터

제한은 애드 혹(Ad hoc)과 어휘적(lexical) 두가지 방법으로 가능하다.

애드혹 제한은 언어의 커뮤니티나 근무하는 그룹 혹은 작업하는 코드베이스에서 설정된 관례에 따르는 것이다. 이는 정적 분석 도구 등의 도움이 없다면 실수로 문제 발생 가능.

어휘적 제한은 올바른 데이터만 노출하기 위한 어휘 범위 및 이를 사용하는 여러 동시 프로세스를 위한 동시성 기본 요소와 관련이 있다. 잘못된 작업이 일어날 수 없다.

// channel의 쓰기를 closure로 제한.
chanOwner := func() <-chan int {
    results := make(chan int, 10)
    go func() {
        defer close(results)
        for i := 0; i <= 5; i++ {
            results <- i
        }
    }()
    return results
}

consumer := func(results <-chan int) {
    for result := range results {
        // do with result
    }
}

results := chanOwner()
consumer(results)
// 슬라이스의 부분들로 제한.
printData := func(wg *sync.WaitGroup, data []byte) {
    defer wg.Done()

    var buf bytes.Buffer
    for _, b := range data {
        // ...do with data
    }
}

var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
go printData(&wg, data[:3])
go printData(&wg, data[3:])
wg.Wait()

제한을 추구하는 목적은 성능을 향상시키고 개발자의 인지 부하를 줄이기 위한 것. 어휘적 제한으로 동기화를 피할 수 있으면 임계영역이 없기 때문에 동기화 비용을 지불할 필요가 없다.

for-select 루프

다음과 같은 패턴이다.

for {
    select {
        // do something with channel
    }
}

다음과 같은 시나리오들에서 이용될 수 있다.

// 채널에서 반복 변수 보내기
for _, s := range []string {"a", "b", "c"} {
    select {
        case <-done: return
        case stringStream <-s:
    }
}
// 멈추기를 기다리면서 무한히 대기
for {
    select {
        case <-done: return
        default: // 여기서나
    }

    // 여기서 선점 불가능한 작업 수행
}

고루틴 누수 방지

고루틴을 잘 정리하여 고루틴이 무한히 실행되어 누수되는 것을 방지하는 방법에 대한 내용이다. (여기서의 누수는 메모리 누수라기보다는, 고루틴의 종료가 누락되어 가비지 컬렉션 되지 않는 데 대한 쪽에 더 가깝다.)

고루틴의 종료 경로에는 몇가지가 있다.

  • 작업이 완료되었을 때.
  • 복구할 수 없는 에러로 인해 더이상 작업을 계속할 수 없을 때.
  • 작업을 중단하라는 요청을 받았을 때.

여기서는 부모 고루틴이 자식 고루틴에게 취소 신호(done channel)를 보낼 수 있도록 설정하는 것을 설명하고 있다.

// 채널에서 값을 수신하는 고루틴
receiver := func(done <-chan interface{}, strings <-chan string) <-chan interface{} {
    terminated := make(chan interface{})
    go func() {
        defer close(terminated)
        for {
            select {
                case s := <-strings: //do work
                case <-done: return // 종료
            }
        }
    }()
    return terminated
}

// 채널에 값을 쓰는 고루틴
sender := func(done <-chan interface{}) <-chan int {
    stream := make(chan int)
    go func() {
        defer close(stream)
        for {
            select {
                case stream <-rand.Int(): //generate values
                case <-done: return // 종료
            }
        }
    }()
    return stream
}

done := make(chan interface{})
terminated := doWork(done, nil)
stream := sender(done)

go func() {
    time.Sleep(1 * time.Second)
    close(done) // 1초 뒤에 종료 신호
}()

// go루틴 종료되면 루프도 종료됨
for i := range stream {
    // do with stream
}

<-terminated // go루틴 종료 확인 & 대기

여기서 규약은, 다른 고루틴을 생성한 고루틴은 해당 고루틴을 중지시킬 책임도 갖는다. (혹은 적어도 중지시킬 방법을 마련할 책임을 진다.)

or 채널

하나 이상의 done 채널을 done 채널로 결합해, 그 구성요소 중 어느 하나의 채널이 닫히면 결합된 모든 채널이 닫히도록 할 수 있다.

고려할 점은, 런타임 중에는 작업 중인 done 채널의 수를 알 수 없을 수도 있다. 여기서 생성될 고루틴의 수에 대해 걱정하는 것은 성급한 최적화이고, 컴파일 시점에 작업에 필요한 채널의 수를 모를 경우는 done 채널을 결합할 수 있는 방법이 없다. (비싼 비용을 들이면 할 수는 있겠지만)

func or(channels ...<-chan interface{}) <-chan interface{} {
    switch len(channels) {
        case 0: return nil
        case 1: return channels[0]
    }

    orDone := make(chan interface{})
    go func() {
        defer close(orDone)
        
        switch len(channels) {
            case 2:
                select {
                    case <-channels[0]:
                    case <-channels[1]:
                }
            default:
                select {
                    case <-channels[0]:
                    case <-channels[1]:
                    case <-channels[2]:
                    case <-or(append(channels[3:], orDone)...):
                }
        }
    }()
    return orDone
}

// 사용 예시
sign := func(after time.Duration) <-chan interface{} {
    c := make(chan interface{})
    go func() {
        defer close(c)
        time.Sleep(after)
    }()
    return c
}
<-or(
    sign(2 * time.Hour),
    sign(5 * time.Minute),
    sign(1 * time.Second),
    sign(1 * time.Hour),
    sign(1 * time.Minute),
) // 1초 뒤에 종료됨

에러 처리

가장 근본적인 질문은 "에러 처리의 책임자는 누구인가?"라는 질문이다. 어느 시점에는 스택을 따라 에러를 전달하는 것을 멈추고 실제로 뭔가를 수행해야 한다.

이 문제는, 동시성 프로세스라면 더 복잡해진다. 고루틴에서 에러를 어떻게 돌려줄 것인가?

답은, 관심사항을 분리하는 것이다. 동시에 실행되는 프로세스는, 프로그램의 상태에 대해 완전한 정보를 가지고 있는 프로그램의 다른 부분으로, 에러를 보내야 한다. (정상 응답이든, 에러이든)

// 잠재적인 출력을, 잠재적인 에러와 결합
type Result struct {
    Error error
    Response *http.Response
}

// 에러 처리의 문제를 고루틴과 성공적으로 분리.
getter := func(done <-chan interface{}, urls ...string) <-chan Result {
    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            resp, err := http.Get(url)
            select {
                case <-done: return
                case results <- Result{Error: err, Response: resp}:
            }
        }
    }()
    return results
}

이러한 접근법은, 에러에 따른 실행 횟수 제한 로직 등을 깔끔하게 작성하는데도 도움이 된다.

done := make(chan interface{})
defer close(done)

errCount := 0
for result := range getter(done, []string{"a", "https://www.google.com/", "b", "c", "d"}) {
    if result.Error != nil {
        errCount += 1
        if errCount >= 3 { // 3번까지는 retry
            // 더이상 처리 종료
            break
        }
        // 에러이지만 넘어감
        continue
    }

    // 정상 성공 처리
}

다시 정리하면 고루틴들에서 리턴될 값을 구성할 때 에러가 일급 객체로 간주되어야 한다. 고루틴이 에러를 발생시킬 수 있는 경우, 이러한 에러는 결과 타입과 밀접하게 결합되어야 하며, 일반적인 동기 함수처럼 동일한 통신 회선을 통해 전달되어야 한다.

파이프라인

파이프라인은 데이터를 가져와서 작업을 수행하고 결과 데이터를 다시 전달하는 일련의 작업이다. 각 작업 단계를 스테이지(stage)라고 부른다.

각 단계의 관심사를 분리할 수 있다. 상호 독립적으로 단계를 수정할 수 있으며, 각 단계의 수정과 무관하게 단계들의 결합방식을 짜맞출 수도 있다. 이전 단계 또는 다음 단계의 작업을 동시에 처리할 수도 있고, 일부분을 팬아웃 하거나 속도를 제한할 수도 있다.

파이프라인 단계들은 다음과 같은 특성이 있다.

  • 각 단계는 동일한 타입을 소비하고 리턴한다.
  • 각 단계는 전달될 수 있도록 언어에 의해 구체화(일급함수. 함수 시그니처를 같는 변수 지원.)되어야 한다.

파이프라인의 단계는 함수형 프로그래밍에서 모나드의 부분 집합으로 간주될 수 있다.

각 단계에서 데이터 슬라이스를 가져와서 데이터 슬라이스를 리턴하는 방법은 두가지가 있다. (둘다 원본 데이터를 변경하지 않음)

  • 일괄 처리 : 한번에 하나씩 이산 값을 처리하는 대신에 모든 데이터 덩어리를 한번에 처리.
  • 스트림 처리 : 각 단계가 한번에 하나의 요소를 수신하고 방출.
    • 스트림 처리는 일괄 처리에 비해 장단점이 있는데,
    • 장점 : 프로그램의 메모리 사용량은 파이프라인의 입력 크기만큼 줄어든다.
    • 단점 : 데이터를 공급하는 방식의 재사용을 제한하고, 확장성을 제한하며, 루프가 반복될 때마다 파이프라인을 인스턴스화 하게 된다.
// 일괄 처리
multiplyB := func(values []int, multiplier int) []int {
    multipliedValues := make([]int, len(values))
    for i, v := range values {
        multipliedValues[i] = v * multiplier
    }
    return multipliedValues
}

addB := func(values []int, additive int) []int {
    addedValues := make([]int, len(values))
    for i, v := range values {
        addedValues[i] = v + additive
    }
    return addedValues
}

ints := []int{ 1, 2, 3, 4 }
for _, v := range addB(multiplyB(ints, 2), 1) {
    // do..
}

// 스트림 처리
multiplyS := func(value, multiplier int) int {
    return value * multiplier
}

addS := func(value, additive int) int {
    return value * additive
}

for _, v := range ints {
    result := multiplyS(addS(v, 1), 2)
    // do..
}

파이프라인 구축의 모범 사례

Go의 채널은 파이프라인을 구성하기 위한 최상의 방법을 제공한다. 채널은 값을 받고, 방출할 수 있고, 동시에 실행해도 안전하며, 여러가지를 아우르고, 언어에 의해 구체화된다. 채널을 사용하는 아래 예는 다음과 같은 장점이 있다.

  • 파이프라인의 끝에서 range 구문을 사용해 값을 추출하며, 입력과 출력이 동시 실행되는 컨텍스트에서 안전하기 때문에 각 단계에서 안전하게 동시에 실행할 수 있다.
  • 파이프라인의 각 단계가 상호 독립적으로 동시에 실행될 수 있다.

이 방법을 통해 구성된 파이프라인은, done 채널을 닫음으로서 선점하여 종료가 가능하다.

generator := func(done <-chan interface{}, integers ...int) <- chan int {
    intStream := make(chan int, len(integers))
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
                case <-done: return
                case intStream <- i:
            }
        }
    }()
    return intStream
}

multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
                case <-done: return
                case multipliedStream <- i * multiplier:
            }
        }
    }()
    return multipliedStream
}

add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
                case <-done: return
                case addedStream <- i + additive:
            }
        }
    }()
    return addedStream 
}

done := make(chan interface{})
defer close(done)

intStream := generator(done, 1, 2, 3, 4)
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

for v := range pipeline {
    // do..
}

유용한 생성기들

유용한 생성기와 파이프라인 단계들을 소개한다.

값의 타입을 interface{}로 하고 있는 것은 각 단계는 작업 중인 타입에 대한 정보가 필요치 않기 때문이고, 범용적으로 재사용 가능한 이득을 주기 때문이다. 특정 타입을 처리해야 할 경우는 타입단정문(type assertion)을 이용하는 단계를 추가하면 되는데, 이의 성능상 부담은 무시할 만 하다. (특정 타입 버전의 2배 성능 저하)

// 일정한 값 집합을 계속 반복하여 전달한다.
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            for _, v := range values {
                select {
                    case <-done: return
                    case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

// 반복적으로 함수를 호출하여 값을 생성한다.
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
                case <-done: return
                case valueStream <-fn():
            }
        }
    }()
    return valueStream
}

// 처음 몇개의 항목을 취한 후 종료한다.
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int) <-chan interface{} {
    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
                case <-done: return
                case takeStream <- <-valueStream:
            }
        }
    }()
    return takeStream
}

// 타입을 string으로 변경
toString := func(done <-chan interface{}, valueStream <-chan interface{}) <-chan string {
    stringStream := make(chan string)
    go func() {
        defer close(stringStream)
        for v := range valueStream {
            select {
                case <-done: return
                case stringStream <- v.(string):
            }
        }
    }()
    return stringStream
}

팬 아웃, 팬 인

  • 팬 아웃 : 파이프라인의 입력을 처리하기 위해 여러 개의 고루틴들을 시작하는 프로세스.
  • 팬 인 : 여러 결과를 하나의 채널로 결합하는 프로세스.

다음 두가지 사항이 적용되는 경우, 팬아웃을 고려해볼 수 있다.

  • 순서 독립성 : 단계가 이전에 계산한 값에 의존하지 않는다.
  • 단계를 실행하는데 시간이 오래 걸린다.

예제를 정리해서, fanOut/fanIn이 자동으로 일어나게 하는 스테이지 코드를 작성해 보았다.

fanIn := func(done <-chan interface{}, channels ...<-chan interface{}) <-chan interface{} {
    var wg sync.WaitGroup
    multiplexedStream := make(chan interface{})

    multiplex := func(c <-chan interface{}) {
        defer wg.Done()
        for i := range c {
            select {
            case <-done:
                return
            case multiplexedStream <- i:
            }
        }
    }

    wg.Add(len(channels))
    for _, c := range channels {
        go multiplex(c)
    }

    go func() {
        wg.Wait()
        close(multiplexedStream)
    }()

    return multiplexedStream
}

fanOut := func(
    done <-chan interface{},
    fn func(done <-chan interface{}, values <-chan interface{}) <-chan interface{},
    values <-chan interface{}
) []<-chan interface{} {
    numFinders := runtime.NumCPU()
    finders := make([]<-chan interface{}, numFinders)
    for i := 0; i < numFinders; i++ {
        finders[i] = fn(done, values)
    }
    return finders
}

fanOutIn := func(
    done <-chan interface{},
    fn func(done <-chan interface{}, values <- chan interface{}) <-chan interface{},
    values <-chan interface{}
) <-chan interface{} {
    channels := fanOut(done, fn, values)
    return fanIn(done, channels...)
}

// 사용 예시
for prime := range take(done, fanOutIn(done, multiplier, randIntStream), 10) {
    fmt.Println("%d", prime)
}

or-done 채널

done과 채널의 취소를 모두 다를 수 있게 하는 코드를 캡슐화 하는 패턴이다.

orDone := func(done, c <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
                case <-done: return
                case v, ok := <- c:
                    if ok == false {
                        return
                    }

                    select {
                        case valStream <- v:
                        case <-done:
                    }
            }
        }
    }()
    return valStream
}

tee 채널

입력 스트림을 가져와서 동일한 값을 별개의 두 채널로 리턴할 수 있다. (한쪽 채널은 처리하고, 한쪽 채널의 값을 로그나 감사를 위해 보내는 식으로 쓸 수 있다.)

out1과 out2에 대한 쓰기가 완료된 후에야 in에서 값을 가져간다.

tee := func(done <-chan interface{}, in <-chan interface{}) (_, _ <-chan interface{}) {
    out1 := make(chan interface{})
    out2 := make(chan interface{})

    go func() {
        defer close(out1)
        defer close(out2)

        for val := range orDone(done, in) {
            var out1, out2 = out1, out2
            for i := 0; i < 2; i++ {
                select {
                    case <-done:
                    case out1 <- val: out1 = nil
                    case out2 <- val: out2 = nil
                }
            }
        }
    }()
    return out1, out2
}

bridge 채널

채널들의 채널을 하나의 채널로 변환한다. 하나의 채널을 모두 반복하고, 채널이 닫히면 다음 채널을 읽기 시작한다.

채널의 채널을 파괴하는 것은 소유자/책임자가 해야 한다.

bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {
    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            val stream <-chan interface{}
            select {
                case maybeStream, ok := <-chanStream:
                    if ok == false {
                        return
                    }
                    
                    stream = maybeStream
                case <-done: return
            }

            for val := range orDone(done, stream) {
                select {
                    case valStream <- val:
                    case <-done:
                }
            }
        }
    }()
    return valStream
}

대기열 사용

파이프라인이 준비되지 않았더라도 파이프라인에 대한 작업을 받아들이는 것.

대기열 사용은 프로그램의 실행 속도를 높여주는 것을 의도하는 것이 아니다. 단계 중 하나의 실행 시간이 줄어드는 것이 아니라, 그 단계가 차단(blocked, 멈춤) 상태에 있는 시간이 줄어드는 것이다. 한 단계의 실행 시간이 다른 단계의 실행 시간에 영향을 미치지 않게 한다는 점에서 유용하다. (blocking 대비 non-blocking 작업의 이점에 가깝다고 이해하면 좋을 듯.)

대기열이 시스템의 전반적인 성능을 향상시킬 수 있는 경우가 있는데,

  • 특정 단계에서 일괄 처리 요청이 시간을 절약하는 경우 :
    • 데이터 덩어리를 버퍼에 모았다가 쓰기(chunking).
    • bufio 패키지.
    • 하나의 연산을 실행하는데 발생하는 오버헤드보다, 청킹을 통해 얻을 수 있는 성능 향상이 큰 경우.
    • 청킹 뿐만 아니라 후방참조(look-behind)나 큐잉 순서 & 우선순위를 정렬함으로써 알고리즘을 최적화할 수 있는 경우에도 유용하다.
  • 특정 단계의 지연으로 인해 시스템에 피드백 루프가 생성되는 경우 :
    • 부정적인 피드백 루프 = 하향 나선형 = 죽음의 나선형 : 상류 단계 또는 시스템이 파이프라인에 새로운 요청을 하는 효율성에 비해 파이프라인의 효율성이 저하되면, 부정적인 피드백 루프가 시작됨.
    • 실패에 대한 안전장치(fail-safe)가 없으면 회복 불가능.
    • 대기열에서 꺼낼 때는 파이프라인이 처리할 준비가 되었는지 반드시 확인해야 한다.

대기열은 다음 두 경우에 추가해야 이득을 볼 수 있다.

  • 파이프라인의 입구
  • 일괄 작업으로 효율이 높아지는 단계

파이프라인 자체를 효율화하여 시스템 체류 시간을 줄이는 방법만이 효율성을 증가시키는 방법이다. 효율성 개선 없는 대기열 도입은 오히려 시스템 체류 시간을 늘릴 수 있다.

리틀의 법칙. 파이프라인의 처리량을 예측하는 법칙. panic 등의 실패 상황에 대해서는 이 법칙으로 측정할 수 없다.

L = 시스템의 평균 요청 수
λ = 요청의 평균 도착률
W = 요청이 시스템에 체류하는 평균 시간
n = 대기열의 크기

L = λΣW

여기에 대기열을 추가하면, 요청의 도착률이 증가하거나, 시스템 체류 시간의 증가로 이어진다. 성능은 나아지지 않는다.

nL = nλ * ΣW
    or
nL = λ * nΣW

이를 이용해, 파이프라인이 필요로 하는 대기열의 크기를 구할 수 있다.

하나의 요청을 처리하는 데 1ms가 걸리고, 초당 10만건의 요청을 처리할 수 있어야 하는데, 파이프라인의 단계가 3단계라면,

L requests - 3 requests = 100000 requests/sec * 0.001 sec
L - 3 = 10
L = 7

7 요청의 용량이 대기열에 필요하게 된다.

개발 중에는 대기열의 크기를 0으로 유지하거나, 필요에 따라 나중에 읽을 수 있도록 어딘가에 보존되는 영구 대기열을 사용하는 것이 나은데, 이 쪽이 문제를 확인하는데는 더 낫다.

context 패키지

취소되었다는 단순할 알림(채널을 close하는) 대신 취소 이유가 무엇이고, 또는 함수에 완료되어야만 하는 마감 시한이 있는지 등의 추가 정보를 전달할 수 있다.

context 패키지 링크

context 패키지는 두가지 목적으로 이용된다.

  • 호출 그래프 상의 분기를 취소하기 위한 API 제공
  • 호출 그래프를 따라 요청범위(request-scope) 데이터를 전송하기 위한 데이터 저장소(data-bag)의 제공

context는 불변이다. 부모에게 영향을 주지 않으면서 자신의 요구사항에 부합하는 컨텍스트를 생성할 수 있다.

context의 인스턴스는 객체의 멤버로 저장하지 않는 것이 좋다. 항상 함수에서 함수로 전달하는 것이 좋다.

WithValue()를 쓸 때는, 키와 값이 interface{} 타입이어서 타입안전성을 잃어버리므로, 패키지 내부에 맞춤형 키 타입을 정의하도록 하면, context 내의 맵에서 서로 다른 타입으로 인해 키가 구분되도록 하고, 키의 타입이 모듈 밖으로 알려지지 않도록 하여 안전하게 키/값을 저장할 수 있다. 대신 다른 패키지에서 이 타입을 참조하여 조회하고자 시도하게 되면, 자칫 순환 의존성이 생기는 등 이상한 상황이 발생할 수 있긴 하다. (참고 자료 Go context WithValue 안전하게 사용하기)

context에 값을 저장하는 기능은, API나 프로세스 경계를 통과하는 요청범위의 데이터에 대해서만 사용하고, 함수에 선택적 매개변수를 전달하는 용도로는 사용하지 않는다.

더불어 context에 저장할 데이터를 구분하는 기준은, 명확하진 않으나, 경험적으로 다음과 같은 규칙을 가지고 판단해볼 수 있다.

  • API나 프로세스 경계를 통과하는 데이터여야 한다.
  • 변경불가능(immutable)한 데이터여야 한다.
  • 단순한 타입의 데이터로 변환할 수 있어야 한다.
  • 메서드가 있는 타입이 아닌 데이터여야 한다.
  • 연산을 주도하는 것이 아니라 꾸미는 데 도움이 되는 데이터여야 한다. (포함 여부에 따라 알고리즘이 크게 바뀌지 않아야 함)

(추가로 찾아본 자료들)

// fin.