select и атомарные операции.make(chan тип_элемента) или make(chan тип_элемента, ёмкость_канала). В первом случае создастся небуферизированный канал, а во втором - буферизированный. // создать канал для значений типа int
ch1 := make(chan int)
// создать буферизированный канал для 5 элементов для значений типа string
ch2 := make(chan string, 5)
// создать канал для структур типа MyType
ch3 := make(chan MyType) len(ch) и cap(ch).ch <- v отправляет значение v в канал ch. Для получения значения из канала используется оператор <- ch. Этот вариант применяется, когда получаемое значение не важно. Если нужно сохранить значение, то слева указывается оператор присваивания v := <-ch.package main
import (
"fmt"
"time"
)
func main() {
chIn := make(chan int, 50)
chOut := make(chan int)
go func() {
// горутина берёт числа из chIn
for {
left := <-chIn
right := <-chIn
// получаем два числа из chIn и записываем их сумму в chOut
chOut <- left + right
}
}()
go func() {
// горутина берёт числа из chOut
for {
s := <-chOut
// пусть обработка значений из chOut занимает какое-то время
time.Sleep(20 * time.Millisecond)
if s%10 == 1 {
fmt.Printf("%d ", s)
}
}
}()
// отправляем сто чисел в канал chIn
for i := 0; i < 100; i++ {
chIn <- i
}
fmt.Printf("# ")
time.Sleep(1000 * time.Millisecond)
} chIn.// для chIn := make(chan int)
1 21 41 61 81 101 121 141 161 181 #
// для chIn := make(chan int, 50)
1 21 41 61 81 # 101 121 141 161 181
// для chIn := make(chan int, 100)
# 1 21 41 61 81 101 121 141 161 181 close(ch). Когда другие горутины попытаются получить значение из закрытого канала, им будут возвращаться нулевые значения. i, ok := <-ch. Если канал закрыт, значение ok будет равно false. Получение двух значений используется, когда необходимо отличать нулевое значение, отправленное в канал, от нулевого значения, возвращаемого из закрытого канала.package main
import (
"fmt"
)
func main() {
ch := make(chan int, 1)
// так как ёмкость канала больше 0, то можно записать
// одно значение не ожидая, когда оно прочитается
ch <- 10
// попробуем закрыть канал, в котором есть значение
close(ch)
fmt.Println("len =", len(ch), "cap =", cap(ch))
v, closed := <-ch
fmt.Println(v, closed, "len =", len(ch))
v, closed = <-ch
fmt.Println(v, closed, "len =", len(ch))
} len = 1 cap = 1
10 true len = 0
0 false len = 0 make(). Если вы просто определите переменную типа chan, она, как и в случае со слайсами, будет равна nil. Таким образом, есть три состояния канала: nil;fatal error: all goroutines are asleep - deadlock!.Queue — очередь задач, которую реализуем с помощью каналов;Resizer — модуль, который ресайзит картинки;Worker — воркер, который вытаскивает задачу из очереди и обрабатывает её.package main
import (
"fmt"
"runtime"
"sync"
"time"
)
// Task содержит имя файла для конвертации
type Task struct {
Filename string
}
// Queue - это очередь задач
type Queue struct {
ch chan *Task
}
func NewQueue() *Queue {
return &Queue{
ch: make(chan *Task, 1),
}
}
func (q *Queue) Push(t *Task) {
// добавляем задачу в очередь
q.ch <- t
}
func (q *Queue) PopWait() *Task {
// получаем задачу
return <-q.ch
}
type Resizer struct {
Width uint32
Height uint32
}
func NewResizer(w, h uint32) *Resizer {
return &Resizer{
Width: w,
Height: h,
}
}
func (r *Resizer) Resize(filename string) error {
// пропустим реализацию
time.Sleep(100 * time.Millisecond)
return nil
}
type Worker struct {
id int
queue *Queue
resizer *Resizer
}
func NewWorker(id int, queue *Queue, resizer *Resizer) *Worker {
w := Worker{
id: id,
queue: queue,
resizer: resizer,
}
return &w
}
func (w *Worker) Loop() {
for {
t := w.queue.PopWait()
err := w.resizer.Resize(t.Filename)
if err != nil {
fmt.Printf("error: %v\n", err)
continue
}
fmt.Printf("worker #%d resized %s\n", w.id, t.Filename)
}
}
func main() {
queue := NewQueue()
for i := 0; i < runtime.NumCPU(); i++ {
w := NewWorker(i, queue, NewResizer(1024, 1024))
go w.Loop()
}
for i := 0; i < 50; i++ {
imagefile := fmt.Sprintf("gopher%d.jpg", i)
queue.Push(&Task{Filename: imagefile})
}
time.Sleep(2 * time.Second)
} range позволяет в цикле принимать значения из канала до тех пор, пока он не будет закрыт. Например, горутина должна постоянно получать значения из канала и обрабатывать их определённым образом. range используется в канале, который закрывается, иначе он будет эквивалентен бесконечному циклу.for i := range ch {
// тело цикла
} for {
i, ok := <-ch
if !ok {
break
}
// тело цикла
} package main
import "fmt"
func fibonacci(n int, ch chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
ch <- x // посылаем значения в канал
x, y = y, x+y
}
close(ch) // закрываем канал
}
func main() {
ch := make(chan int, 7)
// специально делаем буфер канала меньше,
// чем количество чисел Фибоначчи
go fibonacci(15, ch)
for i := range ch {
// считываем значения из канала, пока он не будет закрыт
fmt.Printf("%d ", i)
}
} 0 1 1 2 3 5 8 13 21 34 55 89 144 233 377 select.select {
case x := <-ch1:
// сценарий выполнится, если быстрее всего новое значение окажется в канале ch1
case y := <-ch2:
// сценарий выполнится, если быстрее всего новое значение окажется в канале ch2
case ch3 <- z:
// сценарий выполнится, если быстрее отправим значение в канал ch3
} select переходит в режим ожидания. Когда без блокировки нужно проверить, есть ли новые сообщения, необходимо добавить секцию default. Она выполнится вместо перехода в режим ожидания, и управление перейдёт к следующему после select коду.select {
case x := <-ch1:
// ...
case ch3 <- z:
// ...
default:
// код выполнится, если не подошёл ни один из вариантов выше
// ...
} select выполняется только один из блоков case. Если несколько условий срабатывают одновременно, то вариант выбирается случайным образом: порядок case не имеет значения. Если нужно постоянно проверять условия оператора, поместите select в цикл.select позволяет использовать break внутри секций, но break обеспечивает только выход из select. Чтобы прервать цикл с select, применяют следующие подходы:return;break с указанием метки цикла.package main
import "fmt"
func fibonacci(ch chan int, quit chan bool) {
x, y := 0, 1
loop: // метка цикла
for {
select {
case ch <- x: // ждём, когда заберут значение из канала,
// чтобы сгенерировать следующее
x, y = y, x+y
case <-quit: // параллельно ждём сигнала об окончании работы
break loop
}
}
fmt.Println("Выход")
}
func main() {
ch := make(chan int)
quit := make(chan bool)
go func() {
for i := 0; i < 15; i++ {
fmt.Println(<-ch)
}
// подаём сигнал об окончании работы
quit <- true
}()
fibonacci(ch, quit)
} select может принимать данные из нескольких каналов, но некоторые из этих каналов могут закрыться в процессе работы. Так как из закрытого канала возвращаются нулевые значения, select по-прежнему будет выполнять соответствующий ему блок case. Чтобы этого избежать, можно присвоить закрытому каналу nil — тогда оператор select будет пропускать варианты с нулевыми каналами.for {
select {
// этот вариант будет пропускаться, если ch == nil
case v, ok := <-ch:
if !ok {
ch = nil
continue
}
// ...
}
} select. Для этого создадим канал done := make(chan struct{}). Его будут читать все горутины. done, то его получит только одна горутина. Но нужно отправить сигнал всем горутинам. Так как закрытый канал отдаёт нулевые значения в неблокирующем режиме, достаточно закрыть канал, чтобы оповестить о событии все потоки.var done = make(chan struct{})
func worker(wg *sync.WaitGroup, i int) {
for {
select {
case <-done:
fmt.Println("Завершаем", i)
wg.Done()
return
default:
fmt.Println(i)
}
time.Sleep(50 * time.Millisecond)
}
}
func main() {
var wg sync.WaitGroup
// создаём горутины
for i := 0; i < 10; i++ {
wg.Add(1)
go worker(&wg, i)
}
time.Sleep(1 * time.Second)
// сообщаем горутинам о завершении работы
close(done)
// ждём завершения всех горутин
wg.Wait()
} <-chan — только для получения данных. Например, par <-chan int;chan<- — только для отправки данных. Например, par chan<- string.// Generate отправляет в канал out односимвольные строки.
func Generate(out chan<- string) {
for ch := 'a'; ch <= 'z'; ch++ {
out <- string([]rune{ch})
}
close(out)
}
// Process читает строки из канала in, переводит их в верхний регистр
// и отправляет в канал out.
func Process(in <-chan string, out chan<- string) {
for v := range in {
out <- strings.ToUpper(v)
}
close(out)
}
func main() {
lower := make(chan string)
upper := make(chan string)
go Generate(lower)
go Process(lower, upper)
// выводим строки из канала upper по мере получения
for s := range upper {
fmt.Print(s)
}
} ABCDEFGHIJKLMNOPQRSTUVWXYZ package main
import (
"fmt"
"sync"
)
func count() {
var counter int64
var wg sync.WaitGroup
// горутины увеличивают значение счётчика
for i := 0; i < 25; i++ {
wg.Add(1)
go func() {
for i := 0; i < 2000; i++ {
counter++
}
wg.Done()
}()
}
wg.Wait()
fmt.Printf("%d ", counter)
}
func main() {
// делаем несколько попыток
for i := 0; i < 5; i++ {
count()
}
} 41131 47906 42555 50000 47864 sync/atomic стандартной библиотеки реализованы атомарные операции для типов: int32, int64, uint32, uint64, uintptr, unsafe.Pointer.int64. Пакет sync/atomic содержит аналогичные функции и для других типов.AddInt64(addr *int64, delta int64) — добавить delta к значению addr. Функция возвращает новое значение. var addr int64
atomic.AddInt64(&addr, 1) CompareAndSwapInt64(addr *int64, old int64, new int64) — сравнить значение addr с old и заменить на new, если они равны. Функция возвращает true, если значение addr было заменено. var addr int64
atomic.CompareAndSwapInt64(&addr, 0, 77) SwapInt64(addr *int64, new int64) — изменить значение addr на new. Функция возвращает предыдущее значение. var addr int64
old := atomic.SwapInt64(&addr, 3) LoadInt64(addr *int64) — разыменовать указатель и возвратить значение. var addr int64
val := atomic.LoadInt64(&addr) StoreInt64(addr *int64, val int64) — записать значение по указателю. var addr int64
atomic.StoreInt64(&addr, 5) package main
import (
"fmt"
"sync"
"sync/atomic"
)
func count() {
var counter int64
var wg sync.WaitGroup
// горутины увеличивают значение счётчика
for i := 0; i < 25; i++ {
wg.Add(1)
go func() {
for i := 0; i < 2000; i++ {
atomic.AddInt64(&counter, 1)
}
wg.Done()
}()
}
wg.Wait()
fmt.Printf("%d ", atomic.LoadInt64(&counter))
}
func main() {
// делаем несколько попыток
for i := 0; i < 5; i++ {
count()
}
} 50000 50000 50000 50000 50000 atomic.Bool, atomic.Int32, atomic.Int64, atomic.Pointerи так далее — с соответствующими методами. Перечислим методы для типа atomic.Bool:// CompareAndSwap сравнивает значение с `old` и в случае равенства заменяет
// на `new`. Метод возвращает `true`, если значение было заменено.
func (x *Bool) CompareAndSwap(old, new bool) (swapped bool)
// Load возвращает логическое значение.
func (x *Bool) Load() bool
// Store записывает значение в переменную.
func (x *Bool) Store(val bool)
// Swap изменяет значение на указанное и возвращает предыдущее значение.
func (x *Bool) Swap(new bool) (old bool) atomic.Int64.package main
import (
"fmt"
"sync"
"sync/atomic"
)
func count() {
var counter atomic.Int64
var wg sync.WaitGroup
// горутины увеличивают значение счётчика
for i := 0; i < 25; i++ {
wg.Add(1)
go func() {
for i := 0; i < 2000; i++ {
counter.Add(1)
}
wg.Done()
}()
}
wg.Wait()
fmt.Printf("%d ", counter.Load())
}
func main() {
// делаем несколько попыток
for i := 0; i < 5; i++ {
count()
}
} select и range для управления потоком данных. Вы изучили атомарные операции и узнали, как их применять в горутинах. Все эти знания пригодятся при создании эффективных и надежные мнопоточных программ.