package main
import "fmt"
func main() {
input := 1
// пример 1
// Вызываем функцию сложения, результат которой становится аргументом первого параметра функции умножения
fmt.Println(multiply(add(input, 1), 2))
// пример 2
// можно поменять местами этапы, чтобы получить другой результат
fmt.Println(add(multiply(input, 1), 2))
}
// add функция сложения
func add(x int, y int) int {
return x + y
}
// multiply функция умножения
func multiply(x int, y int) int {
return x * y
} add() и multiply() так, чтобы они принимали канал inputCh и возвращали канал resultCh. Обратите внимание, что код, представленный в примере, работающий. Его можно просто скопировать и запустить.package main
import "fmt"
// add принимает на вход сигнальный канал для прекращения работы и канал с входными данными для работы,
// а возвращает канал, в который будет отправляться результат вычислений.
// На фоне будет запущена горутина, выполняющая вычисления до момента закрытия doneCh.
func add(doneCh chan struct{}, inputCh chan int) chan int {
// канал с результатом
addRes := make(chan int)
// горутина, в которой добавляем к значению из inputCh единицу и отправляем результат в addRes
go func() {
// закрываем канал, когда горутина завершается
defer close(addRes)
// берём из канала inputCh значения, которые надо изменить
for data := range inputCh {
result := data + 1
select {
// если канал doneCh закрылся, выходим из горутины
case <-doneCh:
return
// если doneCh не закрыт, отправляем результат вычисления в канал результата
case addRes <- result:
}
}
}()
// возвращаем канал для результатов вычислений
return addRes
}
// multiply принимает на вход сигнальный канал для прекращения работы и канал с входными данными для работы,
// а возвращает канал, в который будет отправляться результат вычислений.
// На фоне будет запущена горутина, выполняющая вычисления до момента закрытия doneCh.
func multiply(doneCh chan struct{}, inputCh chan int) chan int {
// канал с результатом
multiplyRes := make(chan int)
// горутина, в которой значение из inputCh умножаем на 2 и возвращаем в канал multiplyRes
go func() {
// закрываем канал multipleRes, когда горутина завершается
defer close(multiplyRes)
// берем из канала inputCh значения, которые надо изменить
for data := range inputCh {
// изменяем данные
result := data * 2
select {
// если канал doneCh закрылся, выходим из горутины
case <-doneCh:
return
// если doneCh не закрыт, отправляем результат вычисления в канал результата
case multiplyRes <- result:
}
}
}()
// возвращаем канал для результатов вычислений
return multiplyRes
}
// generator возвращает канал с данными
func generator(doneCh chan struct{}, input []int) chan int {
// канал, в который будем отправлять данные из слайса
inputCh := make(chan int)
// горутина, в которой отправляем в канал inputCh данные
go func() {
// как отправители закрываем канал, когда всё отправим
defer close(inputCh)
// перебираем все данные в слайсе
for _, data := range input {
select {
// если doneCh закрыт, сразу выходим из горутины
case <-doneCh:
return
// если doneCh не закрыт, кидаем в канал inputCh данные data
case inputCh <- data:
}
}
}()
// возвращаем канал для данных
return inputCh
}
func main() {
// ваши данные в слайсе
input := []int{1, 2, 3, 4, 5, 6, 7, 8}
// канал для сигнала к выходу из горутины
doneCh := make(chan struct{})
// при завершении программы закрываем канал doneCh, чтобы все горутины тоже завершились
defer close(doneCh)
// получаем канал с данными с помощью генератора
inputCh := generator(doneCh, input)
// ваш конвейер, сначала работает add, потом multiply
resultCh := multiply(doneCh, add(doneCh, inputCh))
// выводим результат
for res := range resultCh {
fmt.Println(res)
}
} doneCh, передаём его всем горутинам для их явной отмены. Входной поток данных передаём в add, который возвращает канал с результатами обработки. Этот канал подаётся на вход multiply, для которого результаты работы add будут входными данными. Более развёрнуто этот код выглядит так: addResults := add(doneCh, inputCh)
resultCh := multiply(doneCh, addResults) add и multiply, будут работать параллельно по мере поступления данных.add() из предыдущего примера требует больших вычислительных ресурсов. Одно из решений — увеличить количество рабочих горутин add. Попробуем её замедлить, добавив time.Sleep(time.Second). Функции generator(), add() и muliply() точно такие же, как и в примере паттерна Конвейер:package main
import (
"fmt"
)
// generator функция из предыдущего примера, делает то же, что и делала
func generator(doneCh chan struct{}, input []int) chan int {
inputCh := make(chan int)
go func() {
defer close(inputCh)
for _, data := range input {
select {
case <-doneCh:
return
case inputCh <- data:
}
}
}()
return inputCh
}
// multiply функция из предыдущего примера, делает то же, что и делала
func multiply(doneCh chan struct{}, inputCh chan int) chan int {
multiplyRes := make(chan int)
go func() {
defer close(multiplyRes)
for data := range inputCh {
result := data * 2
select {
case <-doneCh:
return
case multiplyRes <- result:
}
}
}()
return multiplyRes
}
// add функция из предыдущего примера, делает то же, что и делала
func add(doneCh chan struct{}, inputCh chan int) chan int {
addRes := make(chan int)
go func() {
defer close(addRes)
for data := range inputCh {
// замедлим вычисление, как будто функция add требует больше вычислительных ресурсов
time.Sleep(time.Second)
result := data + 1
select {
case <-doneCh:
return
case addRes <- result:
}
}
}()
return addRes
}
// fanOut принимает канал данных, порождает 10 горутин
func fanOut(doneCh chan struct{}, inputCh chan int) []chan int {
// количество горутин add
numWorkers := 10
// каналы, в которые отправляются результаты
channels := make([]chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
// получаем канал из горутины add
addResultCh := add(doneCh, inputCh)
// отправляем его в слайс каналов
channels[i] = addResultCh
}
// возвращаем слайс каналов
return channels
} fanOut() принимает канал с входными данными inputCh и порождает десять горутин add для одновременной обработки входного потока. Так как каждая горутина add возвращает свой канал addRes, можно сохранить каналы результатов в срезе и возвращать его основной функции.// fanIn объединяет несколько каналов resultChs в один.
func fanIn(doneCh chan struct{}, resultChs ...chan int) chan int {
// конечный выходной канал в который отправляем данные из всех каналов из слайса, назовём его результирующим
finalCh := make(chan int)
// понадобится для ожидания всех горутин
var wg sync.WaitGroup
// перебираем все входящие каналы
for _, ch := range resultChs {
// в горутину передавать переменную цикла нельзя, поэтому делаем так
chClosure := ch
// инкрементируем счётчик горутин, которые нужно подождать
wg.Add(1)
go func() {
// откладываем сообщение о том, что горутина завершилась
defer wg.Done()
// получаем данные из канала
for data := range chClosure {
select {
// выходим из горутины, если канал закрылся
case <-doneCh:
return
// если не закрылся, отправляем данные в конечный выходной канал
case finalCh <- data:
}
}
}()
}
go func() {
// ждём завершения всех горутин
wg.Wait()
// когда все горутины завершились, закрываем результирующий канал
close(finalCh)
}()
// возвращаем результирующий канал
return finalCh
} fanIn() принимает слайс каналов, который создает функция fanOut(). Для каждого канала создаём отдельную горутину: они будут извлекать данные и передавать их в канал finalCh.finalCh. Затем возвращаем объединенный результирующий канал finalCh основной функции. На каждой итерации цикла создаётся новый экземпляр chClosure, поэтому каждая горутина получает ссылку на тот канал, для которого запускалась.func main() {
// слайс данных
input := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
// сигнальный канал для завершения горутин
doneCh := make(chan struct{})
// закрываем его при завершении программы
defer close(doneCh)
// канал с данными
inputCh := generator(doneCh, input)
// получаем слайс каналов из 10 рабочих add
channels := fanOut(doneCh, inputCh)
// а теперь объединяем десять каналов в один
addResultCh := fanIn(doneCh, channels...)
// передаём тот один канал в следующий этап обработки
resultCh := multiply(doneCh, addResultCh)
// выводим результаты расчетов из канала
for res := range resultCh {
fmt.Println(res)
}
} ❯ go run main.go
10
16
14
8
18
12
6
4 input, создаём сигнальный канал doneCh и делаем отложенное закрытие этого канала. Далее, используя генератор, создаём канал данных inputCh и десять каналов для функции add(). Для этого используем fanOut(). Затем объединяем все каналы в addResultCh. Передаем канал addResultCh в multiply для обработки и выводим результат из канала resultCh.N потокам обращаться к ресурсу за раз. На схеме ниже — работа Семафора.N элементов. Это количество горутин, которые могут одновременно работать с ресурсом.package main
import (
"fmt"
"sync"
"time"
)
// Semaphore структура семафора
type Semaphore struct {
semaCh chan struct{}
}
// NewSemaphore создает семафор с буферизованным каналом емкостью maxReq
func NewSemaphore(maxReq int) *Semaphore {
return &Semaphore {
semaCh: make(chan struct{}, maxReq),
}
}
// когда горутина запускается, отправляем пустую структуру в канал semaCh
func (s *Semaphore) Acquire() {
s.semaCh <- struct{}{}
}
// когда горутина завершается, из канала semaCh убирается пустая структура
func (s *Semaphore) Release() {
<-s.semaCh
} Semaphore. Функция NewSemaphore() создаёт семафор с буферизованным каналом. Размер канала задан в параметре функции. Когда запускается любая горутина, выполняем метод Acquire() и в канал семафора помещаем пустую структуру. Если буферизованный канал заполнен, вызов метода Acquire блокируется. Поэтому горутина будет ждать, пока не освободится канал в семафоре. Когда горутина завершает свою работу, вызывается метод Release(), который забирает пустую структуру из канала семафора semaCh, освобождая место для других горутин, которые ждали выполнения метода Acquire().NewSemaphore() создадим семафор ёмкостью 2. И в цикле — десять горутин:func main() {
// чтобы дождаться всех горутин
var wg sync.WaitGroup
// создаём семафор емкостью 2: он будет пропускать только 2 горутины
semaphore := NewSemaphore(2)
// создаем 10 горутин
for idx := 0; idx < 10; idx++ {
wg.Add(1)
// горутина в которую помещаем её порядковый номер
go func(taskID int) {
// отправляем в канал семафора пустую структуру
semaphore.Acquire()
// откладываем уменьшение счетчика в WaitGroup, когда завершится горутина
defer wg.Done()
// забираем из канала семафора пустую структуру, дав возможность запуститься другим горутинам
defer semaphore.Release()
log.Println("Запущен рабочий %d", taskID)
time.Sleep(1 * time.Second)
}(idx)
}
// ждём завершения всех горутин
wg.Wait()
} ❯ go run main.go
20:32:44 Запущен рабочий 0
20:32:44 Запущен рабочий 1
20:32:45 Запущен рабочий 9
20:32:45 Запущен рабочий 5
20:32:46 Запущен рабочий 7
20:32:46 Запущен рабочий 6
20:32:47 Запущен рабочий 8
20:32:47 Запущен рабочий 2
20:32:48 Запущен рабочий 3
20:32:48 Запущен рабочий 4 package main
import (
"fmt"
"time"
)
// worker это наш рабочий, который принимает два канала:
// jobs - канал задач, это входные данные для обработки
// results - канал результатов, это результаты работы воркера
func worker(id int, jobs <-chan int, result chan<- int) {
for j := range jobs {
// для наглядности будем выводить какой рабочий начал работу и кго задачу
fmt.Println("рабочий", id, "запущен задача", j)
// немного замедлим выполнение рабочего
time.Sleep(time.Second)
// для наглядности выводим какой рабочий завершил какую задачу
fmt.Println("рабочий", id, "закончил задача", j)
// отправляем результат в канал результатов
results <- j + 1
}
}
func main() {
// допустим у вас 5 задач, которые нужно выполнить
const numJobs = 5
// создаем буферизованный канал для принятия задач в воркер
jobs := make(chan int, numJobs)
// создаем буферизованный канал для отправки результатов
results := make(chan int, numJobs)
// создаем и запускаем 3 воркера, это и есть пул,
// передаем id, это для наглядности, канал задач и канал результатов
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// в канал задач отправляем в канал задач какие-то данные
// задач у нас 5, а воркера 3, значит одновременно решается только 3 задачи
for j := 1; j <= numJobs; j++ {
jobs <- j
}
// как вы помните, закрываем канал на стороне отправителя
close(jobs)
// забираем из канала результатов результаты ;)
// можно присваивать переменной, или выводить на экран, но мы не будем
for a := 1; a <= numJobs; a++ {
<-results
}
} > go run main.go
рабочий 3 запустился задача 1
рабочий 1 запустился задача 2
рабочий 2 запустился задача 3
рабочий 2 закончил задача 3
рабочий 3 закончил задача 1
рабочий 3 запустился задача 4
рабочий 1 закончил задача 2
рабочий 2 запустился задача 5
рабочий 3 закончил задача 4
рабочий 2 закончил задача 5 SaveMessage на SaveMessages. Для этого обновим файл internal/store/store.go:package store
import (
"context"
"errors"
"time"
)
var ErrConflict = errors.New("data conflict")
type Store interface {
FindRecepient(ctx context.Context, username string) (userID string, err error)
ListMessages(ctx context.Context, userID string) ([]Message, error)
GetMessage(ctx context.Context, id int64) (*Message, error)
// SaveMessages сохраняет несколько сообщений
SaveMessages(ctx context.Context, messages ...Message) error
RegisterUser(ctx context.Context, userID, username string) error
}
type Message struct {
ID int64
Sender string
Recepient string // получатель
Time time.Time
Payload string
}
SaveMessages хранилища PostgreSQL. Добавим в файл internal/store/pg/store.go следующий код:func (s Store) SaveMessages(ctx context.Context, messages ...store.Message) error {
// соберём данные для создания запроса с групповой вставкой
var values []string
var args []any
for i, msg := range messages {
// в нашем запросе по 4 параметра на каждое сообщение
base := i * 4
// PostgreSQL требует шаблоны в формате ($1, $2, $3, $4) для каждой вставки
params := fmt.Sprintf("($%d, $%d, $%d, $%d)", base+1, base+2, base+3, base+4)
values = append(values, params)
args = append(args, msg.Sender, msg.Recepient, msg.Payload, msg.Time)
}
// составляем строку запроса
query := `
INSERT INTO messages
(sender, recepient, payload, sent_at)
VALUES ` + strings.Join(values, ",") + `;`
// добавляем новые сообщения в БД
_, err := s.conn.ExecContext(ctx, query, args...)
return err
} cmd/skill/app.go:...
// app инкапсулирует в себя все зависимости и логику приложения
type app struct {
store store.Store
// канал для отложенной отправки новых сообщений
msgChan chan store.Message
}
func newApp(s store.Store) *app {
instance := &app{
store: s,
msgChan: make(chan store.Message, 1024), // установим каналу буфер в 1024 сообщения
}
// запустим горутину с фоновым сохранением новых сообщений
go instance.flushMessages()
return instance
}
func (a *app) webhook(w http.ResponseWriter, r *http.Request) {
...
switch true {
// пользователь попросил отправить сообщение
case strings.HasPrefix(req.Request.Command, "Отправь"):
// гипотетическая функция parseSendCommand вычленит из запроса логин адресата и текст сообщения
username, message := parseSendCommand(command)
// найдём внутренний идентификатор адресата по его логину
recepientID, err := a.store.FindRecepient(ctx, username)
if err != nil {
logger.Log.Debug("cannot find recepient by username", zap.String("username", username), zap.Error(err))
w.WriteHeader(http.StatusInternalServerError)
return
}
// отправим сообщение в очередь на сохранение
a.msgChan <- store.Message{
Sender: req.Session.User.UserID,
Recepient: recepientID,
Time: time.Now(),
Payload: message,
}
// оповестим отправителя об успешности операции
text = "Сообщение успешно отправлено"
// пользователь попросил прочитать сообщение
case strings.HasPrefix(req.Request.Command, "Прочитай"):
...
case strings.HasPrefix(req.Request.Command, "Зарегистрируй"):
...
// если не поняли команду, просто скажем пользователю, сколько у него новых сообщений
default:
...
}
// заполним модель ответа
resp := models.Response{
Response: models.ResponsePayload{
Text: text, // Алиса проговорит текст
},
Version: "1.0",
}
w.Header().Set("Content-Type", "application/json")
// сериализуем ответ сервера
enc := json.NewEncoder(w)
if err := enc.Encode(resp); err != nil {
logger.Log.Debug("error encoding response", zap.Error(err))
return
}
logger.Log.Debug("sending HTTP 200 response")
}
// flushMessages постоянно сохраняет несколько сообщений в хранилище с определённым интервалом
func (a *app) flushMessages() {
// будем сохранять сообщения, накопленные за последние 10 секунд
ticker := time.NewTicker(10 * time.Second)
var messages []store.Message
for {
select {
case msg := <-a.msgChan:
// добавим сообщение в слайс для последующего сохранения
messages = append(messages, msg)
case <-ticker.C:
// подождём, пока придёт хотя бы одно сообщение
if len(messages) == 0 {
continue
}
// сохраним все пришедшие сообщения одновременно
err := a.store.SaveMessages(context.TODO(), messages...)
if err != nil {
logger.Log.Debug("cannot save messages", zap.Error(err))
// не будем стирать сообщения, попробуем отправить их чуть позже
continue
}
// сотрём успешно отосланные сообщения
messages = nil
}
}
}