Каналы
Last updated
Was this helpful?
Last updated
Was this helpful?
Каналы — это механизм коммуникации между горутинами. Одна горутина может записывать в канал данные в то время, когда другая горутина читает эти данные из этого же канала. У каждого канала есть тип элемента, который является типом передаваемого по каналу значения. Канал по своей структуре схож со срезом или мапой и тоже является ссылочным типом. Это означает что объявив переменную типа канала, она по умолчанию будет nil . А при передаче канала в функцию как аргумент, мы передаем ссылку. Для записи и чтения данных в/из канала используется <-
.
ch := make(chan int) // инициализация канала
ch <- 1 // запись в канал
number := <-ch // чтение из канала
Для того чтобы инициализировать не пустой канал используется функция make() .
Давайте начнём с разбора структуры канала:
qcount — количество элементов в буфере
dataqsiz — размерность буфера
buf — указатель на буфер для элементов канала
closed — флаг, указывающий, закрыт канал или нет
recvq — указатель на связанный список горутин, ожидающих чтения из канала
sendq -указатель на связанный список горутин, ожидающих запись в канал
lock — мьютекс для безопасного доступа к каналу
В общем случае, горутина захватывает мьютекс, когда совершает какое-либо действие с каналом, кроме случаев lock-free проверок при неблокирующих вызовах (я объясню это подробнее чуть ниже). Closed — это флаг, который устанавливается в 1, если канал закрыт, и в 0, если не закрыт. Эти поля далее будут исключены из общей картины, для большей ясности. Канал может быть синхронным (небуферизированным) или асинхронным (буферезированным). Давайте вначале посмотрим, как работают синхронные каналы.
Допустим, у нас есть следующий код:
Теперь у нас осталась только одна работающая горутина, которая пытается записать данные в канал. Все проверки повторяются снова, и когда горутина проверяет recvq
очередь, она находит ожидающую чтение горутину, удаляет её из очереди, записывает данные в её стек и снимает блокировку. Это единственное место во всём рантайме Go, когда одна горутина пишет напрямую в стек другой горутины. После этого шага, канал выглядит точно так же, как сразу после инициализации. Обе горутины завершаются и программа выходит.
Так устроены синхронные каналы. Сейчас же, давайте посмотрим на буферизированные каналы
Рассмотрим следующий пример:
Разница в сравнении с синхронным каналом в том, что тут Go выделяет буфер и устанавливает значение dataqsiz
в единицу.
Следующим шагом будет отправка первого значения в канал. Чтобы сделать это, горутина сначала производит несколько проверок: пуста ли очередь recvq
, пуст ли буфер, достаточно ли места в буфере.
Закрытие канала это простая операция. Go проходит по всем ожидающим на чтение или запись горутинам и разблокирует их. Все получатели получают дефолтные значение переменных того типа данных канала, а все отправители паникуют.
Но постойте, Go же ещё поддерживает select с дефолтным поведением, и если канал заблокирован, как горутина сможет обработать default? Хороший вопрос, давайте быстро посмотрим на приватное API каналов. Когда вы запускаете следующий кусок кода:
Go запускает функцию со следующей сигнатурой:
chantype
это тип канала (например, bool в случае make(chan bool)), hchan
— указатель на структуру канала, ep
— указатель на сегмент памяти, куда должны быть записаны данные из канала, и последний, но самый интересный для нас — это аргумент block
. Если он установлен в false
, то функция будет работать в неблокирующем режиме. В этом режиме горутина проверяет буфер и очередь, возвращает true
и пишет данные в ep
или возвращает false
, если нет данных в буфере или нет отправителей в очереди. Проверки буфера и очереди реализованы как атомарные операции, и не требуют блокировки мьютекса.
Также есть функция для записи данных в очередь с аналогичной сигнатурой.
select
похож на switch
без аргументов, но он может использоваться только для операций с каналами. Оператор select
используется для выполнения операции только с одним из множества каналов, условно выбранного блоком case.
Давай взглянем на пример ниже, и обсудим как он работает:
В этом примере мы используем оператор select
как switch
, но вместо булевых операций, мы используем операции для чтения данных из канала. Оператор select
также является блокируемым, за исключением использования default
(позже вы увидите пример с его использованием). После выполнения одного из блоков case
, горутина main
будет разблокирована. Задались вопросом когда case
условие выполнится?
Если все блоки case
являются блокируемыми, тогда select
будет ждать до момента, пока один из блоков case
разблокируется и будет выполнен. Если несколько или все канальные операции не блокируемы, тогда один из неблокируемых case
будет выбран случайным образом (Примечание переводчика: имеется ввиду случай, когда пришли одновременно данные из двух и более каналов).
Давайте наконец разберем программу, которую написали ранее. Мы запустили 2 горутины с независимыми каналами. Затем мы использовали оператор select
c двумя case
операторами. Один case
считывает данные из chan1
а другой из chan2
. Так как каналы не используют буфер, операция чтения будет блокируемой. Таким образом оба case
будут блокируемыми и select
будет ждать до тех пор, пока один из case
не разблокируется.
Когда программа находится в блоке select
горутина main
будет заблокирована и будут запланированы все горутины (по одной за раз), которые используются в блоке select
, в нашем случае это service1
и service2
. service1
ждет 3 секунды, после чего будет разблокирован и сможет записать данные в chan1
. Таким же образом как и service1
действует service2
, только он ожидает 5 секунд и осуществляет запись в chan2
. Так как service1
разблокируется раньше, чем service2
, первый case
разблокируется раньше и произведет чтение из chan1
, а второй case
будет проигнорирован. После чего управление вернется в main
, и программа завершится после вывода в консоль.
Вывод программы:
Вышеприведенная программа имитирует реальный веб-сервис, в котором балансировщик нагрузки получает миллионы запросов и должен возвращать ответ от одной из доступных служб. Используя стандартные горутины, каналы и select, мы можем запросить ответ у нескольких сервисов, и тот, который ответит раньше всех, может быть использован.
Для того, чтобы симулировать случай, когда все блоки case
разблокируются в одно и тоже время, мы может просто удалить вызов Sleep из горутин.
Данная программа выводит следующий результат:
Но иногда вы можете получить следующий результат:
Это происходит потому, что операции chan1
и chan2
выполняются практически одновременно, но все же существует некоторая разница во времени при исполнении и планировании горутин.
Для того, чтобы сделать все блоки case
неблокируемыми, мы можем использовать каналы с буфером.
Вывод может быть следующим:
Или таким:
В приведенной программе оба канала имеют буфер размером 2. Так как мы отправляем 2 значения в буфер, горутина не будет заблокирована и программа перейдет в блок select
. Чтение из буферизированного канала не является блокируемой операцией, если буфер не пустой, поэтому все блоки case
будут неблокируемыми, и во время выполнения Go выберет case
случайным образом.
Так же как и switch
, оператор select
поддерживает оператор default
. Оператор default
является неблокируемым, но это еще не все, оператор default
делает блок select
всегда неблокируемым. Это означает, что операции отправки и чтение на любом канале (не имеет значения будет ли канал с буфером или без) всегда будут неблокируемыми.
Если значение будет доступно на каком-либо канале, то select
выполнит этот case
. Если нет, то он немедленно выполнит default
.
Вывод программы:
Так как в приведенной программе каналы используются без буфера, и значение еще отсутствует, в обоих каналах будет исполнен default
. Если бы в блоке select
отсутствовал default
, то произошла бы блокировка и результат был бы другим.
Так как с default
select
не блокируется, планировщик не запускает доступные горутины. Но main
можно заблокировать, вызвав time.Sleep
. Таким образом все горутины будут исполнены, и когда управление перейдет в main
, каналы будут иметь данные для чтения.
По итогу мы получим следующий результат
Или такой, в некоторых случаях:
Для того, чтобы избежать deadlock
, можно использовать default
, чтобы операции с каналами стали неблокируемыми, планировщик Go не будет планировать горутины для отправки данных в канал, даже если данные не доступны на данный момент.
Вывод программы:
Аналогично получению данных, операция отправки данных будет работать также в случае использования оператора default
, если присутствуют другие горутины, готовые принять отправленные данные (в режиме ожидания).
Подобно пустому for{}
, пустой select{}
так же является валидным, но есть подвох. Как мы уже знаем select
блокируется до тех пор, пока один из блоков case
не будет выполнен, но так как в пустом select
отсутствуют блоки case
, горутина не будет разблокирована, и как результат, мы получим deadlock
.
ссылки
Вначале создается новый канал и он выглядит вот так:
Go не выделяет буфер для синхронных каналов, поэтому указатель на буфер равен nil и dataqsiz
равен нулю. В приведённом коде нет гарантии, что случится первее — чтение из канала или запись, поэтому допустим, что первым действием будет чтение из канала (обратный пример, когда вначале идёт запись, будет рассмотрена ниже в примере с буферизированным каналами). Вначале, текущая горутина произведёт некоторые проверки, такие как: закрыт ли канал, буферизирован он или нет, содержит ли гоуртины в send-очереди. В нашем примере у канала нет ни буфера, ни ожидающих отправки горутин, поэтому горутина добавит сама себя в recvq
и заблокируется. На этом шаге наш канал будет выглядеть следующим образом:
Опять же, порядок исполнения неизвестен, пример с первой читающей горутиной мы разобрали выше, поэтому сейчас допустим, что два значения были записаны в канал, и после этого один из элементов вычитан. И первым шагом идёт создание канала, который будет выглядеть вот так:
В нашем случае в буфере достаточно места и в очереди ожидания чтения нет горутин, поэтому горутина просто записывает элемент в буфер, увеличивает значение qcount
и продолжает исполнение далее. Канал в этот момент выглядит так:
На следующем шаге, горутина main отправляет следующее значение в канал. Когда буфер полон, буферизированный канал будет вести себя точно так же, как синхронный (небуферизированный) канал, тоесть горутина добавит себя в очередь ожидания и заблокируется, в результате чего, канал будет выглядеть следующим образом:
Сейчас горутина main заблокирована и Go запустил одну анонимную горутину, которая пытается прочесть значение из канала. И вот тут начинается хитрая часть. Go гарантирует, что канал работает по принципу FIFO очереди (), но горутина не может просто взять значение из буфера и продолжить исполнение. В этом случае горутина main заблокируется навсегда. Для решения этой ситуации, текущая горутина читает данные из буфера, затем добавляет значение из заблокированной горутины в буфер, разблокирует ожидающую горутину и удаляет её из очереди ожидания. (В случае же, если нет ожидающих горутину, она просто читает первое значение из буфера)