📕
Golang
  • Собеседование по golang
  • Вопросы собеседования
    • Список вопросов МТС
    • Список вопросов
    • Базовые вопросы по Golang
    • Go. Прорабатываем 25 основных вопросов собеседования
    • Функции
    • Структуры данных
    • Конкурентность и Параллелизм
    • Горутины
    • Примитивы синхронизации
    • Планировщик
    • Go: конкурентность и привязки к потокам в планировщике
    • Каналы
    • GC
  • спецификация
    • Спецификация Go: преобразования в и из строкового типа
    • переключатель типов (type switch)
    • for утверждения (for statements)
    • for утверждения с range условием
    • go утверждения (go statements)
    • select утверждения (select statements)
    • return утверждения (return statements)
    • continue утверждения (continue statements)
    • goto утверждения (goto statements)
    • fallthrough утверждения (fallthrough statements, утверждения "провала")
    • defer утверждения (defer statements)
    • встроенные функции, функция close
    • длина и емкость
    • аллокация, создание срезов (slice), карт (map) и каналов
    • добавление в срезы и копирование срезов
    • удаление элементов карты
    • обработка паники
    • начальная загрузка (bootstrapping)
    • пакеты
    • инициализация и выполнение программы, нулевое значение
    • инициализация пакета
    • выполнение программы
    • ошибки
    • паника во время выполнения (run-time panic)
  • Эффективный go
    • эффективный go
    • 50 оттенков go
    • Go: распространенные антипаттерны
    • Визуализация concurrency в Go с WebGL
  • требования для работы
    • Список навыков
  • habr
    • Изучаем многопоточное программирование в Go по картинкам
    • Go: конкурентность и привязки к потокам в планировщике
  • NP
    • Полиморфизм с интерфейсами в Golang
    • Объектно-ориентированное программирование в Golang
    • Владеешь merge  -  освой и rebase
  • ProgLib
    • Горутины
  • Untitled
  • Оптимизация
    • Go: Должен ли я использовать указатель вместо копии моей структуры?
  • Полезняшки
    • Using PostgreSQL JSONB with Go
Powered by GitBook
On this page
  • Устройство канала
  • Синхронные каналы
  • Буферезированные каналы
  • Закрытие канала
  • Select
  • Deadlock

Was this helpful?

  1. Вопросы собеседования

Каналы

PreviousGo: конкурентность и привязки к потокам в планировщикеNextGC

Last updated 4 years ago

Was this helpful?

Каналы — это механизм коммуникации между горутинами. Одна горутина может записывать в канал данные в то время, когда другая горутина читает эти данные из этого же канала. У каждого канала есть тип элемента, который является типом передаваемого по каналу значения. Канал по своей структуре схож со срезом или мапой и тоже является ссылочным типом. Это означает что объявив переменную типа канала, она по умолчанию будет nil . А при передаче канала в функцию как аргумент, мы передаем ссылку. Для записи и чтения данных в/из канала используется <- .

ch := make(chan int) // инициализация канала ch <- 1 // запись в канал number := <-ch // чтение из канала

Для того чтобы инициализировать не пустой канал используется функция make() .

Устройство канала

Давайте начнём с разбора структуры канала:

  • qcount — количество элементов в буфере

  • dataqsiz — размерность буфера

  • buf — указатель на буфер для элементов канала

  • closed — флаг, указывающий, закрыт канал или нет

  • recvq — указатель на связанный список горутин, ожидающих чтения из канала

  • sendq -указатель на связанный список горутин, ожидающих запись в канал

  • lock — мьютекс для безопасного доступа к каналу

В общем случае, горутина захватывает мьютекс, когда совершает какое-либо действие с каналом, кроме случаев lock-free проверок при неблокирующих вызовах (я объясню это подробнее чуть ниже). Closed — это флаг, который устанавливается в 1, если канал закрыт, и в 0, если не закрыт. Эти поля далее будут исключены из общей картины, для большей ясности. Канал может быть синхронным (небуферизированным) или асинхронным (буферезированным). Давайте вначале посмотрим, как работают синхронные каналы.

Синхронные каналы

Допустим, у нас есть следующий код:

package main

func main() {
    ch := make(chan bool)
    go func() {
        ch <- true
    }()
    <-ch
}

Теперь у нас осталась только одна работающая горутина, которая пытается записать данные в канал. Все проверки повторяются снова, и когда горутина проверяет recvq очередь, она находит ожидающую чтение горутину, удаляет её из очереди, записывает данные в её стек и снимает блокировку. Это единственное место во всём рантайме Go, когда одна горутина пишет напрямую в стек другой горутины. После этого шага, канал выглядит точно так же, как сразу после инициализации. Обе горутины завершаются и программа выходит.

Так устроены синхронные каналы. Сейчас же, давайте посмотрим на буферизированные каналы

Буферезированные каналы

Рассмотрим следующий пример:

package main

func main() {
    ch := make(chan bool, 1)
    ch <- true
    go func() {
        <-ch
    }()
    ch <- true
}

Разница в сравнении с синхронным каналом в том, что тут Go выделяет буфер и устанавливает значение dataqsiz в единицу.

Следующим шагом будет отправка первого значения в канал. Чтобы сделать это, горутина сначала производит несколько проверок: пуста ли очередь recvq, пуст ли буфер, достаточно ли места в буфере.

Закрытие канала

Закрытие канала это простая операция. Go проходит по всем ожидающим на чтение или запись горутинам и разблокирует их. Все получатели получают дефолтные значение переменных того типа данных канала, а все отправители паникуют.

Select

Но постойте, Go же ещё поддерживает select с дефолтным поведением, и если канал заблокирован, как горутина сможет обработать default? Хороший вопрос, давайте быстро посмотрим на приватное API каналов. Когда вы запускаете следующий кусок кода:

    select {
    case <-ch:
        foo()
    default:
        bar()
    }

Go запускает функцию со следующей сигнатурой:

func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool)

chantype это тип канала (например, bool в случае make(chan bool)), hchan — указатель на структуру канала, ep — указатель на сегмент памяти, куда должны быть записаны данные из канала, и последний, но самый интересный для нас — это аргумент block. Если он установлен в false, то функция будет работать в неблокирующем режиме. В этом режиме горутина проверяет буфер и очередь, возвращает true и пишет данные в ep или возвращает false, если нет данных в буфере или нет отправителей в очереди. Проверки буфера и очереди реализованы как атомарные операции, и не требуют блокировки мьютекса.

Также есть функция для записи данных в очередь с аналогичной сигнатурой.

select похож на switch без аргументов, но он может использоваться только для операций с каналами. Оператор select используется для выполнения операции только с одним из множества каналов, условно выбранного блоком case.

Давай взглянем на пример ниже, и обсудим как он работает:

package main

import (
    "fmt"
    "time"
)

var start time.Time
func init() {
    start = time.Now()
}

func service1(c chan string) {
    time.Sleep(3 * time.Second)
    c <- "Hello from service 1"
}

func service2(c chan string) {
    time.Sleep(5 * time.Second)
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main() started", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    go service1(chan1)
    go service2(chan2)

    select {
    case res := <-chan1:
        fmt.Println("Response from service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response from service 2", res, time.Since(start))
    }

    fmt.Println("main() stopped", time.Since(start))
}

В этом примере мы используем оператор select как switch, но вместо булевых операций, мы используем операции для чтения данных из канала. Оператор select также является блокируемым, за исключением использования default(позже вы увидите пример с его использованием). После выполнения одного из блоков case, горутина main будет разблокирована. Задались вопросом когда case условие выполнится?

Если все блоки case являются блокируемыми, тогда select будет ждать до момента, пока один из блоков case разблокируется и будет выполнен. Если несколько или все канальные операции не блокируемы, тогда один из неблокируемых case будет выбран случайным образом (Примечание переводчика: имеется ввиду случай, когда пришли одновременно данные из двух и более каналов).

Давайте наконец разберем программу, которую написали ранее. Мы запустили 2 горутины с независимыми каналами. Затем мы использовали оператор select c двумя case операторами. Один case считывает данные из chan1 а другой из chan2. Так как каналы не используют буфер, операция чтения будет блокируемой. Таким образом оба case будут блокируемыми и select будет ждать до тех пор, пока один из case не разблокируется.

Когда программа находится в блоке select горутина main будет заблокирована и будут запланированы все горутины (по одной за раз), которые используются в блоке select, в нашем случае это service1 и service2. service1 ждет 3 секунды, после чего будет разблокирован и сможет записать данные в chan1. Таким же образом как и service1 действует service2, только он ожидает 5 секунд и осуществляет запись в chan2. Так как service1 разблокируется раньше, чем service2, первый case разблокируется раньше и произведет чтение из chan1, а второй case будет проигнорирован. После чего управление вернется в main, и программа завершится после вывода в консоль.

Вывод программы:

main() started 0s
Response from service 1 Hello from service 1 3s
main() stopped 3s

Вышеприведенная программа имитирует реальный веб-сервис, в котором балансировщик нагрузки получает миллионы запросов и должен возвращать ответ от одной из доступных служб. Используя стандартные горутины, каналы и select, мы можем запросить ответ у нескольких сервисов, и тот, который ответит раньше всех, может быть использован.

Для того, чтобы симулировать случай, когда все блоки case разблокируются в одно и тоже время, мы может просто удалить вызов Sleep из горутин.

package main

import (
    "fmt"
    "time"
)

var start time.Time
func init() {
    start = time.Now()
}

func service1(c chan string) {
    c <- "Hello from service 1"
}

func service2(c chan string) {
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main() started", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    go service1(chan1)
    go service2(chan2)

    select {
    case res := <-chan1:
        fmt.Println("Response from service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response from service 2", res, time.Since(start))
    }

    fmt.Println("main() stopped", time.Since(start))
}

Данная программа выводит следующий результат:

main() started 0s
service2() started 481µs
Response from service 2 Hello from service 2 981.1µs
main() stopped 981.1µs

Но иногда вы можете получить следующий результат:

main() started 0s
service1() started 484.8µs
Response from service 1 Hello from service 1 984µs
main() stopped 984µs

Это происходит потому, что операции chan1 и chan2 выполняются практически одновременно, но все же существует некоторая разница во времени при исполнении и планировании горутин.

Для того, чтобы сделать все блоки case неблокируемыми, мы можем использовать каналы с буфером.

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func main() {
    fmt.Println("main() started", time.Since(start))
    chan1 := make(chan string, 2)
    chan2 := make(chan string, 2)

    chan1 <- "Value 1"
    chan1 <- "Value 2"
    chan2 <- "Value 1"
    chan2 <- "Value 2"

    select {
    case res := <-chan1:
        fmt.Println("Response from chan1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response from chan2", res, time.Since(start))
    }

    fmt.Println("main() stopped", time.Since(start))
}

Вывод может быть следующим:

main() started 0s
Response from chan2 Value 1 0s
main() stopped 1.0012ms

Или таким:

main() started 0s
Response from chan1 Value 1 0s
main() stopped 1.0012ms

В приведенной программе оба канала имеют буфер размером 2. Так как мы отправляем 2 значения в буфер, горутина не будет заблокирована и программа перейдет в блок select. Чтение из буферизированного канала не является блокируемой операцией, если буфер не пустой, поэтому все блоки case будут неблокируемыми, и во время выполнения Go выберет case случайным образом.

default case

Так же как и switch, оператор select поддерживает оператор default. Оператор default является неблокируемым, но это еще не все, оператор default делает блок select всегда неблокируемым. Это означает, что операции отправки и чтение на любом канале (не имеет значения будет ли канал с буфером или без) всегда будут неблокируемыми.

Если значение будет доступно на каком-либо канале, то select выполнит этот case. Если нет, то он немедленно выполнит default.

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func service1(c chan string) {
    fmt.Println("service1() started", time.Since(start))
    c <- "Hello from service 1"
}

func service2(c chan string) {
    fmt.Println("service2() started", time.Since(start))
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main() started", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    go service1(chan1)
    go service2(chan2)

    select {
    case res := <-chan1:
        fmt.Println("Response from service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response from service 2", res, time.Since(start))
    default:
        fmt.Println("No response received", time.Since(start))
    }

    fmt.Println("main() stopped", time.Since(start))
}

Вывод программы:

main() started 0s
No response received 0s
main() stopped 0s

Так как в приведенной программе каналы используются без буфера, и значение еще отсутствует, в обоих каналах будет исполнен default. Если бы в блоке select отсутствовал default, то произошла бы блокировка и результат был бы другим.

Так как с default select не блокируется, планировщик не запускает доступные горутины. Но main можно заблокировать, вызвав time.Sleep. Таким образом все горутины будут исполнены, и когда управление перейдет в main, каналы будут иметь данные для чтения.

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func service1(c chan string) {
    fmt.Println("service1() started", time.Since(start))
    c <- "Hello from service 1"
}

func service2(c chan string) {
    fmt.Println("service2() started", time.Since(start))
    c <- "Hello from service 2"
}

func main() {
    fmt.Println("main() started", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    go service1(chan1)
    go service2(chan2)

    time.Sleep(3 * time.Second)

    select {
    case res := <-chan1:
        fmt.Println("Response from service 1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response from service 2", res, time.Since(start))
    default:
        fmt.Println("No response received", time.Since(start))
    }

    fmt.Println("main() stopped", time.Since(start))
}

По итогу мы получим следующий результат

main() started 0s
service1() started 0s
service2() started 0s
Response from service 1 Hello from service 1 3.0001805s
main() stopped 3.0001805s

Или такой, в некоторых случаях:

main() started 0s
service1() started 0s
service2() started 0s
Response from service 2 Hello from service 2 3.0000957s
main() stopped 3.0000957s

Deadlock

Для того, чтобы избежать deadlock, можно использовать default, чтобы операции с каналами стали неблокируемыми, планировщик Go не будет планировать горутины для отправки данных в канал, даже если данные не доступны на данный момент.

package main

import (
    "fmt"
    "time"
)

var start time.Time

func init() {
    start = time.Now()
}

func main() {
    fmt.Println("main() started", time.Since(start))

    chan1 := make(chan string)
    chan2 := make(chan string)

    select {
    case res := <-chan1:
        fmt.Println("Response from chan1", res, time.Since(start))
    case res := <-chan2:
        fmt.Println("Response from chan2", res, time.Since(start))
    default:
        fmt.Println("No goroutines available to send data", time.Since(start))
    }

    fmt.Println("main() stopped", time.Since(start))
}

Вывод программы:

main() started 0s
No goroutines available to send data 0s
main() stopped 0s

Аналогично получению данных, операция отправки данных будет работать также в случае использования оператора default, если присутствуют другие горутины, готовые принять отправленные данные (в режиме ожидания).

Пустой select

Подобно пустому for{}, пустой select{} так же является валидным, но есть подвох. Как мы уже знаем select блокируется до тех пор, пока один из блоков case не будет выполнен, но так как в пустом select отсутствуют блоки case, горутина не будет разблокирована, и как результат, мы получим deadlock.

ссылки

Вначале создается новый канал и он выглядит вот так:

Go не выделяет буфер для синхронных каналов, поэтому указатель на буфер равен nil и dataqsiz равен нулю. В приведённом коде нет гарантии, что случится первее — чтение из канала или запись, поэтому допустим, что первым действием будет чтение из канала (обратный пример, когда вначале идёт запись, будет рассмотрена ниже в примере с буферизированным каналами). Вначале, текущая горутина произведёт некоторые проверки, такие как: закрыт ли канал, буферизирован он или нет, содержит ли гоуртины в send-очереди. В нашем примере у канала нет ни буфера, ни ожидающих отправки горутин, поэтому горутина добавит сама себя в recvq и заблокируется. На этом шаге наш канал будет выглядеть следующим образом:

Опять же, порядок исполнения неизвестен, пример с первой читающей горутиной мы разобрали выше, поэтому сейчас допустим, что два значения были записаны в канал, и после этого один из элементов вычитан. И первым шагом идёт создание канала, который будет выглядеть вот так:

В нашем случае в буфере достаточно места и в очереди ожидания чтения нет горутин, поэтому горутина просто записывает элемент в буфер, увеличивает значение qcount и продолжает исполнение далее. Канал в этот момент выглядит так:

На следующем шаге, горутина main отправляет следующее значение в канал. Когда буфер полон, буферизированный канал будет вести себя точно так же, как синхронный (небуферизированный) канал, тоесть горутина добавит себя в очередь ожидания и заблокируется, в результате чего, канал будет выглядеть следующим образом:

Сейчас горутина main заблокирована и Go запустил одну анонимную горутину, которая пытается прочесть значение из канала. И вот тут начинается хитрая часть. Go гарантирует, что канал работает по принципу FIFO очереди (), но горутина не может просто взять значение из буфера и продолжить исполнение. В этом случае горутина main заблокируется навсегда. Для решения этой ситуации, текущая горутина читает данные из буфера, затем добавляет значение из заблокированной горутины в буфер, разблокирует ожидающую горутину и удаляет её из очереди ожидания. (В случае же, если нет ожидающих горутину, она просто читает первое значение из буфера)

спецификация
Go Channels Internals
Logo