源文件 博客/管道/bounded.go
1
2
3 package main
4
5 import (
6 "crypto/md5"
7 "errors"
8 "fmt"
9 "io/ioutil"
10 "os"
11 "path/filepath"
12 "sort"
13 "sync"
14 )
15
16
17
18
19 func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
20 paths := make(chan string)
21 errc := make(chan error, 1)
22 go func() {
23
24 defer close(paths)
25
26 errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
27 if err != nil {
28 return err
29 }
30 if !info.Mode().IsRegular() {
31 return nil
32 }
33 select {
34 case paths <- path:
35 case <-done:
36 return errors.New("walk canceled")
37 }
38 return nil
39 })
40 }()
41 return paths, errc
42 }
43
44
45 type result struct {
46 path string
47 sum [md5.Size]byte
48 err error
49 }
50
51
52
53 func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
54 for path := range paths {
55 data, err := ioutil.ReadFile(path)
56 select {
57 case c <- result{path, md5.Sum(data), err}:
58 case <-done:
59 return
60 }
61 }
62 }
63
64
65
66
67
68 func MD5All(root string) (map[string][md5.Size]byte, error) {
69
70
71 done := make(chan struct{})
72 defer close(done)
73
74 paths, errc := walkFiles(done, root)
75
76
77 c := make(chan result)
78 var wg sync.WaitGroup
79 const numDigesters = 20
80 wg.Add(numDigesters)
81 for i := 0; i < numDigesters; i++ {
82 go func() {
83 digester(done, paths, c)
84 wg.Done()
85 }()
86 }
87 go func() {
88 wg.Wait()
89 close(c)
90 }()
91
92
93 m := make(map[string][md5.Size]byte)
94 for r := range c {
95 if r.err != nil {
96 return nil, r.err
97 }
98 m[r.path] = r.sum
99 }
100
101 if err := <-errc; err != nil {
102 return nil, err
103 }
104 return m, nil
105 }
106
107 func main() {
108
109
110 m, err := MD5All(os.Args[1])
111 if err != nil {
112 fmt.Println(err)
113 return
114 }
115 var paths []string
116 for path := range m {
117 paths = append(paths, path)
118 }
119 sort.Strings(paths)
120 for _, path := range paths {
121 fmt.Printf("%x %s\n", m[path], path)
122 }
123 }
124
查看纯文本