Go 博客

Go 并发模式:管道和取消

Sameer Ajmani
2014 年 3 月 13 日

简介

Go 的并发原语使构建流式数据管道变得容易,这些管道可以有效利用 I/O 和多个 CPU。本文提供了此类管道的示例,突出了操作失败时出现的细微差别,并介绍了干净处理失败的技术。

什么是管道?

Go 中没有管道的正式定义;它只是众多并发程序中的一种。非正式地,管道是一系列通过通道连接的阶段,其中每个阶段都是一组运行相同函数的 goroutine。在每个阶段,goroutine

  • 通过入站通道从上游接收值
  • 对这些数据执行某些函数,通常会生成新值
  • 通过出站通道将值发送到下游

除第一个和最后一个阶段外,每个阶段都有任意数量的入站和出站通道,第一个和最后一个阶段分别只有出站或入站通道。第一个阶段有时称为生产者;最后一个阶段称为消费者

我们将从一个简单的管道示例开始,以解释这些想法和技术。稍后,我们将提供一个更现实的示例。

平方数

考虑一个包含三个阶段的管道。

第一个阶段,gen,是一个函数,它将整数列表转换为一个通道,该通道发出列表中的整数。gen 函数启动一个 goroutine,它在通道上发送整数,并在所有值都发送后关闭通道

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二个阶段,sq,从一个通道接收整数,并返回一个通道,该通道发出每个接收到的整数的平方。在入站通道关闭并且此阶段已将所有值发送到下游后,它将关闭出站通道

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main 函数设置管道并运行最终阶段:它从第二个阶段接收值并打印每个值,直到通道关闭

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

由于 sq 的入站和出站通道具有相同的类型,因此我们可以将其组合任意次数。我们还可以将 main 重写为一个 range 循环,就像其他阶段一样

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

扇出,扇入

多个函数可以从同一个通道读取,直到该通道关闭;这称为扇出。这提供了一种在工作组之间分配工作的方法,以并行化 CPU 使用和 I/O。

一个函数可以从多个输入读取,并在所有输入都关闭之前继续执行,方法是将输入通道多路复用到单个通道上,该通道在所有输入都关闭时关闭。这称为扇入

我们可以更改我们的管道以运行两个 sq 实例,每个实例都从同一个输入通道读取。我们引入了一个新函数 merge 来扇入结果

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge 函数通过为每个入站通道启动一个 goroutine 将通道列表转换为单个通道,这些 goroutine 将值复制到唯一的出站通道。一旦所有 output goroutine 都启动,merge 就会启动另一个 goroutine,在该通道上的所有发送都完成后关闭出站通道。

在已关闭的通道上发送会引发 panic,因此在调用 close 之前确保所有发送都已完成非常重要。 sync.WaitGroup 类型提供了一种安排此同步的简单方法

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

提前停止

我们的管道函数有一种模式

  • 阶段在所有发送操作完成后关闭其出站通道。
  • 阶段继续从入站通道接收值,直到这些通道关闭。

此模式允许将每个接收阶段编写为 range 循环,并确保在所有值成功发送到下游后所有 goroutine 都退出。

但在实际管道中,阶段并不总是接收所有入站值。有时这是设计使然:接收器可能只需要一部分值才能取得进展。更常见的情况是,一个阶段过早退出,因为入站值表示先前阶段中的错误。无论哪种情况,接收器都不应该等待剩余的值到达,我们希望早期阶段停止生成后续阶段不需要的值。

在我们的示例管道中,如果一个阶段未能使用所有入站值,则尝试发送这些值的 goroutine 将无限期阻塞

    // Consume the first value from the output.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
}

这是一个资源泄漏:goroutine 会消耗内存和运行时资源,并且 goroutine 栈中的堆引用会阻止数据被垃圾回收。goroutine 不会被垃圾回收;它们必须自行退出。

我们需要安排管道上游阶段即使在下游阶段未能接收所有入站值时也能退出。一种方法是将出站通道更改为具有缓冲区。缓冲区可以容纳固定数量的值;如果有缓冲区空间,则发送操作会立即完成

c := make(chan int, 2) // buffer size 2
c <- 1  // succeeds immediately
c <- 2  // succeeds immediately
c <- 3  // blocks until another goroutine does <-c and receives 1

当在通道创建时就知道要发送的值的数量时,缓冲区可以简化代码。例如,我们可以重写 gen 以将整数列表复制到缓冲通道中,并避免创建新的 goroutine

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

回到管道中阻塞的 goroutine,我们可能会考虑向 merge 返回的出站通道添加缓冲区

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs
    // ... the rest is unchanged ...

虽然这修复了此程序中的阻塞 goroutine,但这段代码很糟糕。此处缓冲区大小为 1 的选择取决于了解 merge 将接收的值的数量以及下游阶段将使用的值的数量。这是脆弱的:如果我们向 gen 传递其他值,或者下游阶段读取的少于任何值,我们又将拥有阻塞的 goroutine。

相反,我们需要提供一种方法,让下游阶段指示发送方它们将停止接受输入。

显式取消

main 决定在没有从 out 接收所有值的情况下退出时,它必须告诉上游阶段的 goroutine 放弃它们尝试发送的值。它通过在名为 done 的通道上发送值来做到这一点。它发送两个值,因为可能有两个阻塞的发送方

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}
}

发送 goroutine 将其发送操作替换为 select 语句,该语句在 out 上的发送发生或从 done 接收值时继续执行。done 的值类型是空结构体,因为值无关紧要:它是指示应放弃 out 上的发送的接收事件。output goroutine 继续在其入站通道 c 上循环,因此上游阶段不会被阻塞。(我们将在稍后讨论如何允许此循环提前返回。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

这种方法存在一个问题:每个下游接收器都需要知道潜在阻塞的上游发送器的数量,并安排在提前返回时向这些发送器发出信号。跟踪这些计数既乏味又容易出错。

我们需要一种方法来告诉未知且数量无限的 goroutine 停止将其值发送到下游。在 Go 中,我们可以通过关闭通道来做到这一点,因为 在已关闭的通道上进行接收操作总是可以立即进行,产生元素类型的零值。

这意味着 main 只需关闭 done 通道即可解除所有发送者的阻塞。此关闭实际上是对发送者的广播信号。我们扩展了每个管道函数以接受 done 作为参数,并安排通过 defer 语句发生关闭,以便 main 的所有返回路径都将向管道阶段发出退出信号。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)          

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.      
}

现在,我们的每个管道阶段都可以在 done 关闭后立即返回。merge 中的 output 例程可以返回而无需清空其入站通道,因为它知道上游发送方 sq 会在 done 关闭时停止尝试发送。output 通过 defer 语句确保在所有返回路径上都调用 wg.Done

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...

类似地,sq 可以在 done 关闭后立即返回。sq 通过 defer 语句确保其 out 通道在所有返回路径上都关闭

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

以下是管道构建指南

  • 阶段在所有发送操作完成后关闭其出站通道。
  • 阶段继续从入站通道接收值,直到这些通道关闭或发送者被解除阻塞。

管道通过确保有足够的缓冲区来容纳所有发送的值,或者通过在接收器可能放弃通道时显式地向发送器发出信号来解除发送者的阻塞。

消化树

让我们考虑一个更现实的管道。

MD5 是一种消息摘要算法,可用于作为文件的校验和。命令行实用程序 md5sum 会打印文件列表的摘要值。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们的示例程序类似于 md5sum,但它只接收一个目录作为参数,并打印该目录下每个常规文件的摘要值,并按路径名称排序。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

我们程序的主要功能调用了一个辅助函数 MD5All,该函数返回一个从路径名称到摘要值的映射,然后对结果进行排序并打印。

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All 函数是我们讨论的重点。在 serial.go 中,实现不使用并发,只是在遍历树时读取和累加每个文件。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

并行摘要

parallel.go 中,我们将 MD5All 分成了一个两阶段的管道。第一阶段,sumFiles,遍历树,在新的 goroutine 中对每个文件进行摘要,并将结果发送到一个值为 result 类型的通道上。

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles 返回两个通道:一个用于 results,另一个用于 filepath.Walk 返回的错误。walk 函数启动一个新的 goroutine 来处理每个常规文件,然后检查 done。如果 done 已关闭,则 walk 会立即停止。

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

MD5Allc 接收摘要值。MD5All 在发生错误时提前返回,通过 defer 关闭 done

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)          

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

有界并行

parallel.go 中,MD5All 的实现为每个文件启动一个新的 goroutine。在包含许多大型文件的目录中,这可能会分配超出机器可用内存的内存。

我们可以通过限制并行读取的文件数量来限制这些分配。在 bounded.go 中,我们通过为读取文件创建固定数量的 goroutine 来实现这一点。我们的管道现在有三个阶段:遍历树、读取和摘要文件以及收集摘要。

第一阶段,walkFiles,发出树中常规文件的路径。

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中间阶段启动固定数量的 digester goroutine,这些 goroutine 从 paths 接收文件名并将 results 发送到通道 c 上。

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

与我们之前的示例不同,digester 不会关闭其输出通道,因为多个 goroutine 正在发送到一个共享通道上。相反,MD5All 中的代码会安排在所有 digesters 完成时关闭该通道。

    // Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

我们也可以让每个 digester 创建并返回其自己的输出通道,但这样我们就需要额外的 goroutine 来汇聚结果。

最后阶段从 c 接收所有 results,然后检查 errc 中的错误。此检查不能早于此进行,因为在此之前,walkFiles 可能会阻塞发送下游的值。

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

结论

本文介绍了在 Go 中构建流式数据管道的技术。处理此类管道中的故障很棘手,因为管道中的每个阶段都可能会阻塞尝试发送下游的值,并且下游阶段可能不再关心传入的数据。我们展示了如何关闭通道可以向管道启动的所有 goroutine 广播“完成”信号,并定义了正确构建管道的指南。

进一步阅读

下一篇文章:Go 松鼠
上一篇文章:FOSDEM 2014 上的 Go 演讲
博客索引