myFunc()
, a выполнить функцию в новой горутине — это go myFunc()
. Да, так просто. 🙂 При вызове go myFunc()
программа не дожидается завершения вызываемой функции, а выполняет идущий далее код.package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(time.Millisecond)
fmt.Print(s + ` `)
}
}
func main() {
// запустим две горутины с функцией say
go say(`hello`)
go say(`world`)
// обычный вызов функции say
say(`bye`)
}
world hello bye hello bye world bye hello world world hello bye bye
package main
import (
"fmt"
"time"
)
func main() {
for _, v := range []string{"hello", "world", "bye"} {
go func(s string) {
for i := 0; i < 5; i++ {
fmt.Print(s + ` `)
time.Sleep(50 * time.Millisecond)
}
}(v)
}
time.Sleep(time.Second)
}
package main
import (
"fmt"
"time"
)
func main() {
for _, v := range []string{"hello", "world", "bye"} {
go func() {
for i := 0; i < 5; i++ {
fmt.Print(v + ` `)
time.Sleep(time.Millisecond)
}
}()
time.Sleep(time.Millisecond)
}
time.Sleep(time.Second)
}
hello world hello world world bye bye bye bye bye bye bye bye bye bye
for _, v := range []string{"hello", "world", "bye"} {
s := v
go func() {
for i := 0; i < 5; i++ {
fmt.Print(s + ` `)
time.Sleep(50 * time.Millisecond)
}
}()
}
for _, v := range []string{"hello", "world", "bye"} {
v := v
go func() {
for i := 0; i < 5; i++ {
fmt.Print(v + ` `)
time.Sleep(50 * time.Millisecond)
}
}()
}
GOMAXPROCS
и равно количеству логических ядер компьютера. Например, если у вас восьмиядерный компьютер, планировщик будет распределять горутины по восьми потокам ОС.runtime
:GOMAXPROCS(n int) int
— изменяет значение GOMAXPROCS
и возвращает количество логических процессоров, которое было установлено до вызова функции. Если значение GOMAXPROCS
больше, чем количество доступных ядер, скорость работы может не измениться. Чтобы узнать количество логических процессоров, укажите в параметре 0
.Gosched()
— приостанавливает текущую горутину, чтобы планировщик переключился на другую. Работа этой горутины возобновится в порядке очерёдности. В версии Go 1.14 появился вытесняющий планировщик: он вытеснит зависшую по процессорному времени горутину. Таким образом, вызывать переключение горутин вручную не нужно.LockOSThread()
— привязывает горутину к текущему потоку операционной системы. Эта функция пригодится при запуске кода на языке C, который требует выполнения в одном и том же потоке.NumCPU() int
возвращает количество логических ядер процессора. Можно использовать это значение для ограничения количества горутин.NumGoroutine() int
возвращает количество запущенных горутин. Эта функция будет полезна при наблюдении за работой программы.package main
import (
"fmt"
"runtime"
"time"
)
func main() {
fmt.Println("Ядер:", runtime.NumCPU())
fmt.Println("Логических процессоров:", runtime.GOMAXPROCS(2),
"Горутин:", runtime.NumGoroutine())
go func() {
time.Sleep(100 * time.Millisecond)
}()
fmt.Println("Логических процессоров:", runtime.GOMAXPROCS(0),
"Горутин:", runtime.NumGoroutine())
}
Ядер: 8
Логических процессоров: 8 Горутин: 1
Логических процессоров: 2 Горутин: 2
sync.WaitGroup
. sync.WaitGroup
содержит счётчик горутин, завершения которых нужно ждать. Функциональность WaitGroup
очень проста, состоит всего из трёх методов:(*WaitGroup) Add(delta int)
— изменить значение счётчика на указанную величину;(*WaitGroup) Done()
— уменьшить значение счётчика на единицу;(*WaitGroup) Wait()
— ожидать, когда значение счётчика будет равно нулю.package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
const n = 5
for i := 0; i < n; i++ {
wg.Add(1) // инкрементируем счётчик, сколько горутин нужно подождать
go func(i int) {
time.Sleep(100 * time.Millisecond)
fmt.Printf("hi %d\n", i)
// уменьшаем счётчик, когда горутина завершает работу
wg.Done()
}(i)
}
wg.Wait() // ждём все горутины
fmt.Println("Всё готово")
}
hi 0
hi 3
hi 4
hi 2
hi 1
Всё готово
sync.Once
и вызвать для неё метод Do()
, который в параметре принимает функцию инициализации. Вызов этого метода гарантирует, что функция будет выполнена только один раз. Перейдём к примеру.package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Config struct {
once sync.Once
vals map[string]string
}
func (c *Config) Get(k string) (string, bool) {
c.once.Do(func() {
// эта инициализация выполнится только один раз
c.vals = map[string]string{
"host": "127.0.0.1",
"port": fmt.Sprintf("%d", rand.Intn(65535)),
}
})
v, ok := c.vals[k]
return v, ok
}
func main() {
var cfg Config
keys := []string{"host", "port", "port", "host", "port"}
for _, k := range keys {
go func(k string) {
// в одной из горутин произойдёт инициализация cfg
// остальные горутины будут ждать завершения инициализации
v, ok := cfg.Get(k)
if !ok {
return
}
fmt.Printf("%s = %s\n", k, v)
}(k)
}
time.Sleep(1 * time.Second)
}
sync.Once
позволяет легко реализовать одноразовое выполнение определённых действий.package main
import (
"fmt"
"time"
)
func main() {
m := make(map[int]int)
for i := 0; i < 5; i++ {
go func() {
for j := 0; j < 1000; j++ {
if _, ok := m[j]; !ok {
m[j] = j
}
}
}()
}
time.Sleep(1 * time.Second)
fmt.Println(len(m))
}
concurrent map writes
или concurrent map read and map write
, так как эта операция не потокобезопасна. Для решения этой проблемы нужно исключить другие операции с мапой. В этом помогает синхронизация горутин. Механизмы, которые позволяют горутине на время получить эксклюзивный доступ к данным, называются примитивами синхронизации.sync
:sync.Mutex
— примитив, реализующий мьютекс в Go. Мьютекс — это механизм, который позволяет выполнить критические участки кода только одной горутиной;sync.RWMutex
— особый вид мьютекса, который позволяет одновременно выполняться либо произвольному количеству операций чтения, либо одной операции записи;sync.Cond
— переменная условия, которая останавливает горутину до получения сигнала.sync.Mutex
можно вызвать два метода:(m *Mutex) Lock()
— блокирует мьютекс. Занять мьютекс может только одна горутина. Если другие горутины вызовут метод Lock()
для занятого мьютекса, они будут ждать, пока он освободится.(m *Mutex) Unlock()
— разблокирует мьютекс. Горутина должна освободить мьютекс сразу после того, как она закончила работу с общим ресурсом.sync.Mutex
. Предположим, что есть два типа горутин. Одни постоянно меняют мапу, а другие читают. Использование мьютекса синхронизирует все обращения к мапе: только одна горутина может читать или менять её.package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
var m sync.Mutex
cache := map[int]int{}
// горутины, которые изменяют мапу
for i := 0; i < 10; i++ {
go func() {
for {
m.Lock()
cache[rand.Intn(5)] = rand.Intn(100)
m.Unlock()
time.Sleep(time.Second / 20)
}
}()
}
// горутины, которые читают мапу
for i := 0; i < 10; i++ {
go func() {
for {
m.Lock()
fmt.Printf("%#v\n", cache)
m.Unlock()
time.Sleep(time.Second / 100)
}
}()
}
time.Sleep(1 * time.Second)
}
sync.RWMutex
.Lock()
. Если горутина собирается читать данные, то она вызывает метод RLock()
. Метод RLock()
не даёт начать запись пока не будут завершены все операции чтения. Для разблокировки следует использовать соответствующие методы — Unlock()
или RUnlock()
.package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
// меняем тип мьютекса
var m sync.RWMutex
cache := map[int]int{}
for i := 0; i < 10; i++ {
go func() {
for {
// здесь остаются блокировки на запись
m.Lock()
cache[rand.Intn(5)] = rand.Intn(100)
m.Unlock()
time.Sleep(time.Second / 20)
}
}()
}
for i := 0; i < 10; i++ {
go func() {
for {
// при чтении используем Rlock() и RUnlock()
m.RLock()
fmt.Printf("%#v\n", cache)
m.RUnlock()
time.Sleep(time.Second / 100)
}
}()
}
time.Sleep(1 * time.Second)
}
RLock()
используется RUnlock()
, а при Lock()
— Unlock()
.sync.Cond
содержит локер-поле L
типа sync.Locker
, значениями которого выступают типы *sync.Mutex
или *sync.RWMutex
. Значение L
передаётся в функции sync.NewCond(l Locker) *Cond
. (*Cond) Wait()
— разблокирует локер L
и вводит текущую горутину в режим ожидания до получения сигнала. При получении сигнала локер L
блокируется, и выполняется следующий после Wait
код. Горутина должна заблокировать L
перед вызовом Wait
. Чаще всего встречаются такие варианты использования этого метода:// вариант 1
c.L.Lock()
c.Wait()
// производим нужные действия
// ...
c.L.Unlock()
// вариант 2
c.L.Lock()
for !condition() {
c.Wait()
}
// производим нужные действия
// ...
c.L.Unlock()
(*Cond) Signal()
— разблокирует одну из ожидающих горутин, если такие есть.(*Cond) Broadcast()
— разблокирует все горутины в очереди.Lock()
перед Wait()
, а Unlock()
после? И обязательны ли эти вызовы?Wait()
происходит разблокировка мьютекса L
и текущая горутина встаёт в режим ожидания сигнала. Когда приходит сигнал, мьютекс L
блокируется и Wait()
завершает работу. Именно поэтому мы должны лочить мьютекс перед вызовом Wait()
и не забывать освобождать его после.sync.Cond
встречается гораздо реже, чем мьютексы или каналы. sync.Cond
удобно использовать, когда требуется многократное уведомление о событии неопределённого круга подпиcчиков (горутин).Broadcast()
. Удобно, что функция, которая отправляет Broadcast-сигнал не знает, сколько запущено воркеров и не должна об этом думать:package main
import (
"fmt"
"sync"
"time"
)
func startWorkers(c *sync.Cond, val *int) {
workerCount := 3
for i := 0; i < workerCount; i++ {
go func(workerId int) {
c.L.Lock()
for {
c.Wait()
// получили сигнал
fmt.Printf("val %v processed by worker %v\n", *val, workerId)
}
}(i)
}
}
func main() {
var m sync.Mutex
c := sync.NewCond(&m)
val := 0
startWorkers(c, &val)
// ждём, чтобы стартанули все воркеры
time.Sleep(100 * time.Millisecond)
for i := 0; i < 4; i++ {
m.Lock()
val = i
fmt.Printf("set val to %v\n", val)
// отправляем сигнал всем воркерам
c.Broadcast()
m.Unlock()
time.Sleep(time.Millisecond)
}
}
sync.Mutex
и управлять его состоянием методами Lock()
и Unlock()
. sync.RWMutex
, который позволяет горутинам одновременно читать данные и за счёт этого увеличивает скорость работы. sync.Cond
даёт горутинам возможность обмениваться сигналами, в том числе отправлять сигнал всем подписчикам. Эта возможность используется редко: в большинстве случаев для обмена сигналами удобнее использовать каналы. О них мы расскажем в следующем уроке.