Каналы
Каналы — это механизм коммуникации между горутинами. Одна горутина может записывать в канал данные в то время, когда другая горутина читает эти данные из этого же канала. У каждого канала есть тип элемента, который является типом передаваемого по каналу значения. Канал по своей структуре схож со срезом или мапой и тоже является ссылочным типом. Это означает что объявив переменную типа канала, она по умолчанию будет nil . А при передаче канала в функцию как аргумент, мы передаем ссылку. Для записи и чтения данных в/из канала используется <-
.
ch := make(chan int) // инициализация канала
ch <- 1 // запись в канал
number := <-ch // чтение из канала
Для того чтобы инициализировать не пустой канал используется функция make() .
Устройство канала
Давайте начнём с разбора структуры канала:
qcount — количество элементов в буфере
dataqsiz — размерность буфера
buf — указатель на буфер для элементов канала
closed — флаг, указывающий, закрыт канал или нет
recvq — указатель на связанный список горутин, ожидающих чтения из канала
sendq -указатель на связанный список горутин, ожидающих запись в канал
lock — мьютекс для безопасного доступа к каналу
В общем случае, горутина захватывает мьютекс, когда совершает какое-либо действие с каналом, кроме случаев lock-free проверок при неблокирующих вызовах (я объясню это подробнее чуть ниже). Closed — это флаг, который устанавливается в 1, если канал закрыт, и в 0, если не закрыт. Эти поля далее будут исключены из общей картины, для большей ясности. Канал может быть синхронным (небуферизированным) или асинхронным (буферезированным). Давайте вначале посмотрим, как работают синхронные каналы.
Синхронные каналы
Допустим, у нас есть следующий код:
package main
func main() {
ch := make(chan bool)
go func() {
ch <- true
}()
<-ch
}
Вначале создается новый канал и он выглядит вот так:
Go не выделяет буфер для синхронных каналов, поэтому указатель на буфер равен nil и dataqsiz
равен нулю. В приведённом коде нет гарантии, что случится первее — чтение из канала или запись, поэтому допустим, что первым действием будет чтение из канала (обратный пример, когда вначале идёт запись, будет рассмотрена ниже в примере с буферизированным каналами). Вначале, текущая горутина произведёт некоторые проверки, такие как: закрыт ли канал, буферизирован он или нет, содержит ли гоуртины в send-очереди. В нашем примере у канала нет ни буфера, ни ожидающих отправки горутин, поэтому горутина добавит сама себя в recvq
и заблокируется. На этом шаге наш канал будет выглядеть следующим образом:
Теперь у нас осталась только одна работающая горутина, которая пытается записать данные в канал. Все проверки повторяются снова, и когда горутина проверяет recvq
очередь, она находит ожидающую чтение горутину, удаляет её из очереди, записывает данные в её стек и снимает блокировку. Это единственное место во всём рантайме Go, когда одна горутина пишет напрямую в стек другой горутины. После этого шага, канал выглядит точно так же, как сразу после инициализации. Обе горутины завершаются и программа выходит.
Так устроены синхронные каналы. Сейчас же, давайте посмотрим на буферизированные каналы
Буферезированные каналы
Рассмотрим следующий пример:
package main
func main() {
ch := make(chan bool, 1)
ch <- true
go func() {
<-ch
}()
ch <- true
}
Опять же, порядок исполнения неизвестен, пример с первой читающей горутиной мы разобрали выше, поэтому сейчас допустим, что два значения были записаны в канал, и после этого один из элементов вычитан. И первым шагом идёт создание канала, который будет выглядеть вот так:
Разница в сравнении с синхронным каналом в том, что тут Go выделяет буфер и устанавливает значение dataqsiz
в единицу.
Следующим шагом будет отправка первого значения в канал. Чтобы сделать это, горутина сначала производит несколько проверок: пуста ли очередь recvq
, пуст ли буфер, достаточно ли места в буфере.
В нашем случае в буфере достаточно места и в очереди ожидания чтения нет горутин, поэтому горутина просто записывает элемент в буфер, увеличивает значение qcount
и продолжает исполнение далее. Канал в этот момент выглядит так:
На следующем шаге, горутина main отправляет следующее значение в канал. Когда буфер полон, буферизированный канал будет вести себя точно так же, как синхронный (небуферизированный) канал, тоесть горутина добавит себя в очередь ожидания и заблокируется, в результате чего, канал будет выглядеть следующим образом:
Сейчас горутина main заблокирована и Go запустил одну анонимную горутину, которая пытается прочесть значение из канала. И вот тут начинается хитрая часть. Go гарантирует, что канал работает по принципу FIFO очереди (спецификация), но горутина не может просто взять значение из буфера и продолжить исполнение. В этом случае горутина main заблокируется навсегда. Для решения этой ситуации, текущая горутина читает данные из буфера, затем добавляет значение из заблокированной горутины в буфер, разблокирует ожидающую горутину и удаляет её из очереди ожидания. (В случае же, если нет ожидающих горутину, она просто читает первое значение из буфера)
Закрытие канала
Закрытие канала это простая операция. Go проходит по всем ожидающим на чтение или запись горутинам и разблокирует их. Все получатели получают дефолтные значение переменных того типа данных канала, а все отправители паникуют.
Select
Но постойте, Go же ещё поддерживает select с дефолтным поведением, и если канал заблокирован, как горутина сможет обработать default? Хороший вопрос, давайте быстро посмотрим на приватное API каналов. Когда вы запускаете следующий кусок кода:
select {
case <-ch:
foo()
default:
bar()
}
Go запускает функцию со следующей сигнатурой:
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool)
chantype
это тип канала (например, bool в случае make(chan bool)), hchan
— указатель на структуру канала, ep
— указатель на сегмент памяти, куда должны быть записаны данные из канала, и последний, но самый интересный для нас — это аргумент block
. Если он установлен в false
, то функция будет работать в неблокирующем режиме. В этом режиме горутина проверяет буфер и очередь, возвращает true
и пишет данные в ep
или возвращает false
, если нет данных в буфере или нет отправителей в очереди. Проверки буфера и очереди реализованы как атомарные операции, и не требуют блокировки мьютекса.
Также есть функция для записи данных в очередь с аналогичной сигнатурой.
select
похож на switch
без аргументов, но он может использоваться только для операций с каналами. Оператор select
используется для выполнения операции только с одним из множества каналов, условно выбранного блоком case.
Давай взглянем на пример ниже, и обсудим как он работает:
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
time.Sleep(3 * time.Second)
c <- "Hello from service 1"
}
func service2(c chan string) {
time.Sleep(5 * time.Second)
c <- "Hello from service 2"
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
select {
case res := <-chan1:
fmt.Println("Response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response from service 2", res, time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
В этом примере мы используем оператор select
как switch
, но вместо булевых операций, мы используем операции для чтения данных из канала. Оператор select
также является блокируемым, за исключением использования default
(позже вы увидите пример с его использованием). После выполнения одного из блоков case
, горутина main
будет разблокирована. Задались вопросом когда case
условие выполнится?
Если все блоки case
являются блокируемыми, тогда select
будет ждать до момента, пока один из блоков case
разблокируется и будет выполнен. Если несколько или все канальные операции не блокируемы, тогда один из неблокируемых case
будет выбран случайным образом (Примечание переводчика: имеется ввиду случай, когда пришли одновременно данные из двух и более каналов).
Давайте наконец разберем программу, которую написали ранее. Мы запустили 2 горутины с независимыми каналами. Затем мы использовали оператор select
c двумя case
операторами. Один case
считывает данные из chan1
а другой из chan2
. Так как каналы не используют буфер, операция чтения будет блокируемой. Таким образом оба case
будут блокируемыми и select
будет ждать до тех пор, пока один из case
не разблокируется.
Когда программа находится в блоке select
горутина main
будет заблокирована и будут запланированы все горутины (по одной за раз), которые используются в блоке select
, в нашем случае это service1
и service2
. service1
ждет 3 секунды, после чего будет разблокирован и сможет записать данные в chan1
. Таким же образом как и service1
действует service2
, только он ожидает 5 секунд и осуществляет запись в chan2
. Так как service1
разблокируется раньше, чем service2
, первый case
разблокируется раньше и произведет чтение из chan1
, а второй case
будет проигнорирован. После чего управление вернется в main
, и программа завершится после вывода в консоль.
Вывод программы:
main() started 0s
Response from service 1 Hello from service 1 3s
main() stopped 3s
Вышеприведенная программа имитирует реальный веб-сервис, в котором балансировщик нагрузки получает миллионы запросов и должен возвращать ответ от одной из доступных служб. Используя стандартные горутины, каналы и select, мы можем запросить ответ у нескольких сервисов, и тот, который ответит раньше всех, может быть использован.
Для того, чтобы симулировать случай, когда все блоки case
разблокируются в одно и тоже время, мы может просто удалить вызов Sleep из горутин.
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
c <- "Hello from service 1"
}
func service2(c chan string) {
c <- "Hello from service 2"
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
select {
case res := <-chan1:
fmt.Println("Response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response from service 2", res, time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
Данная программа выводит следующий результат:
main() started 0s
service2() started 481µs
Response from service 2 Hello from service 2 981.1µs
main() stopped 981.1µs
Но иногда вы можете получить следующий результат:
main() started 0s
service1() started 484.8µs
Response from service 1 Hello from service 1 984µs
main() stopped 984µs
Это происходит потому, что операции chan1
и chan2
выполняются практически одновременно, но все же существует некоторая разница во времени при исполнении и планировании горутин.
Для того, чтобы сделать все блоки case
неблокируемыми, мы можем использовать каналы с буфером.
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string, 2)
chan2 := make(chan string, 2)
chan1 <- "Value 1"
chan1 <- "Value 2"
chan2 <- "Value 1"
chan2 <- "Value 2"
select {
case res := <-chan1:
fmt.Println("Response from chan1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response from chan2", res, time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
Вывод может быть следующим:
main() started 0s
Response from chan2 Value 1 0s
main() stopped 1.0012ms
Или таким:
main() started 0s
Response from chan1 Value 1 0s
main() stopped 1.0012ms
В приведенной программе оба канала имеют буфер размером 2. Так как мы отправляем 2 значения в буфер, горутина не будет заблокирована и программа перейдет в блок select
. Чтение из буферизированного канала не является блокируемой операцией, если буфер не пустой, поэтому все блоки case
будут неблокируемыми, и во время выполнения Go выберет case
случайным образом.
default case
Так же как и switch
, оператор select
поддерживает оператор default
. Оператор default
является неблокируемым, но это еще не все, оператор default
делает блок select
всегда неблокируемым. Это означает, что операции отправки и чтение на любом канале (не имеет значения будет ли канал с буфером или без) всегда будут неблокируемыми.
Если значение будет доступно на каком-либо канале, то select
выполнит этот case
. Если нет, то он немедленно выполнит default
.
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
fmt.Println("service1() started", time.Since(start))
c <- "Hello from service 1"
}
func service2(c chan string) {
fmt.Println("service2() started", time.Since(start))
c <- "Hello from service 2"
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
select {
case res := <-chan1:
fmt.Println("Response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response from service 2", res, time.Since(start))
default:
fmt.Println("No response received", time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
Вывод программы:
main() started 0s
No response received 0s
main() stopped 0s
Так как в приведенной программе каналы используются без буфера, и значение еще отсутствует, в обоих каналах будет исполнен default
. Если бы в блоке select
отсутствовал default
, то произошла бы блокировка и результат был бы другим.
Так как с default
select
не блокируется, планировщик не запускает доступные горутины. Но main
можно заблокировать, вызвав time.Sleep
. Таким образом все горутины будут исполнены, и когда управление перейдет в main
, каналы будут иметь данные для чтения.
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func service1(c chan string) {
fmt.Println("service1() started", time.Since(start))
c <- "Hello from service 1"
}
func service2(c chan string) {
fmt.Println("service2() started", time.Since(start))
c <- "Hello from service 2"
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
go service1(chan1)
go service2(chan2)
time.Sleep(3 * time.Second)
select {
case res := <-chan1:
fmt.Println("Response from service 1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response from service 2", res, time.Since(start))
default:
fmt.Println("No response received", time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
По итогу мы получим следующий результат
main() started 0s
service1() started 0s
service2() started 0s
Response from service 1 Hello from service 1 3.0001805s
main() stopped 3.0001805s
Или такой, в некоторых случаях:
main() started 0s
service1() started 0s
service2() started 0s
Response from service 2 Hello from service 2 3.0000957s
main() stopped 3.0000957s
Deadlock
Для того, чтобы избежать deadlock
, можно использовать default
, чтобы операции с каналами стали неблокируемыми, планировщик Go не будет планировать горутины для отправки данных в канал, даже если данные не доступны на данный момент.
package main
import (
"fmt"
"time"
)
var start time.Time
func init() {
start = time.Now()
}
func main() {
fmt.Println("main() started", time.Since(start))
chan1 := make(chan string)
chan2 := make(chan string)
select {
case res := <-chan1:
fmt.Println("Response from chan1", res, time.Since(start))
case res := <-chan2:
fmt.Println("Response from chan2", res, time.Since(start))
default:
fmt.Println("No goroutines available to send data", time.Since(start))
}
fmt.Println("main() stopped", time.Since(start))
}
Вывод программы:
main() started 0s
No goroutines available to send data 0s
main() stopped 0s
Аналогично получению данных, операция отправки данных будет работать также в случае использования оператора default
, если присутствуют другие горутины, готовые принять отправленные данные (в режиме ожидания).
Пустой select
Подобно пустому for{}
, пустой select{}
так же является валидным, но есть подвох. Как мы уже знаем select
блокируется до тех пор, пока один из блоков case
не будет выполнен, но так как в пустом select
отсутствуют блоки case
, горутина не будет разблокирована, и как результат, мы получим deadlock
.
ссылки
Last updated
Was this helpful?