Каналы Голанга застревают

Я работаю с Go и Redis для отправки очереди (канал) сообщений для подписчиков. Я стремлюсь создать решение для автоматического масштабирования, которое будет порождать новые процедуры go (в определенных пределах) по мере увеличения очереди. У меня есть следующий код:

// Set up max queued messages
var maxMessages = float64(100000)

// Set up max redis senders
var maxSender = float64(5)

// Set up message channel
var messages = make(chan Message, int(maxMessages))

// Set up messages per sender count
var senderRatio = maxSender / maxMessages

type Message struct {
    ChatId  int    `json:"chatId"`
    UserId  int    `json:"userId"`
    Message string `json:"message"`
    Date    int    `json:"date"`
}

func RedisWriteHandler(messageChannel chan Message) {
    senderCount := 0
    killswitch := make(chan string)
    for {
        length := float64(len(messageChannel))
        neededSenders := int(math.Ceil(length * senderRatio))
        if senderCount < neededSenders || senderCount < 1 {
            log.Printf("Increasing sender count to %d, need %d", senderCount+1, neededSenders)
            go addRedisSender(messageChannel, killswitch)
            senderCount++
        } else if senderCount > neededSenders && senderCount > 1 {
            log.Printf("Decreasing sender count to %d, need %d", senderCount-1, neededSenders)
            killMessage := fmt.Sprintf("only need %d senders", neededSenders)
            killswitch <- killMessage
            senderCount--
        }
    }
    log.Fatal("The redis handler unexpectedly went away")
}

func addRedisSender(messageChannel chan Message, killswitch chan string) {
    c, err := redis.Dial("tcp", "localhost:6379")
    if err != nil {
        log.Println(err)
        return
    }
    defer c.Close()
    for {
        select {
        case msg := <-messageChannel:
            redisChannel := strconv.Itoa(msg.ChatId)
            messageBlob, err := json.Marshal(msg)
            if err != nil {
                log.Println(err)
            }
            _, err = c.Do("PUBLISH", redisChannel, messageBlob)
            if err != nil {
                log.Println(err)
            }
        case kill := <-killswitch:
            log.Printf("Sender killed: %s", kill)
            return
        }
    }
    log.Println("Closing redis sender")
}

Если я запускаю этот код, используя канал с большим размером буфера (скажем, 100 000 сообщений), он добавляет 5 отправителей по одному и начинает работать в очереди - пока все хорошо. Однако в кажущейся случайной точке - около 1500 сообщений в нем зависает. Больше никаких логов (и я думаю, что у меня есть все точки выхода). Мой ожидаемый результат - увеличение отправителей до значения maxSender и его периодическое уменьшение на протяжении всего выполнения. Пример журналов, которые я получаю с сообщениями 100k ниже

2018/05/02 08:21:25 Увеличение количества отправителей до 1, нужно 5
2018/05/02 08:21:25 Увеличение количества отправителей до 2, нужно 5
2018/05/02 08:21:25 Увеличение количества отправителей до 3, нужно 5
2018/05/02 08:21:25 Увеличение количества отправителей до 4, нужно 5
2018/05/02 08:21:25 Увеличение количества отправителей до 5, нужно 5

Тогда ничего.

Из других тестов, которые я выполняю, я вижу, что это не просто медленно, а сообщения просто не берутся с канала. Кто-нибудь может пролить свет на это?

Спасибо,

Сэм

редактировать

Я скрывался вокруг некоторых других вопросов, касающихся зависания без видимой причины, и предлагаемое действие состояло в том, чтобы убить родительский процесс, чтобы получить трассировку стека каждого дочернего процесса. Ниже приведен вывод.

kill -ABRT 9162
SIGABRT: abort
PC=0x65f199 m=0 sigcode=0

goroutine 6 [running]:
main.RedisWriteHandler(0xc42005a180)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:116 +0x99 fp=0xc42004dfd8 sp=0xc42004dee8 pc=0x65f199
runtime.goexit()
    /usr/local/go/src/runtime/asm_amd64.s:2337 +0x1 fp=0xc42004dfe0 sp=0xc42004dfd8 pc=0x458f01
created by main.main
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:236 +0x130

goroutine 1 [IO wait]:
internal/poll.runtime_pollWait(0x7f03114c5f70, 0x72, 0xffffffffffffffff)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422722098, 0x72, 0xc420038b00, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422722098, 0xffffffffffffff00, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Accept(0xc422722080, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:334 +0x1e2
net.(*netFD).accept(0xc422722080, 0x7f0311511000, 0x0, 0x7139b0)
    /usr/local/go/src/net/fd_unix.go:238 +0x42
net.(*TCPListener).accept(0xc4200b2000, 0xc420038d80, 0x4122c8, 0x30)
    /usr/local/go/src/net/tcpsock_posix.go:136 +0x2e
net.(*TCPListener).AcceptTCP(0xc4200b2000, 0xc422736120, 0xc422736120, 0x6a3b00)
    /usr/local/go/src/net/tcpsock.go:234 +0x49
net/http.tcpKeepAliveListener.Accept(0xc4200b2000, 0xc42001c0d8, 0x6a3b00, 0x8813d0, 0x6f2aa0)
    /usr/local/go/src/net/http/server.go:3120 +0x2f
net/http.(*Server).Serve(0xc42007a4e0, 0x8575c0, 0xc4200b2000, 0x0, 0x0)
    /usr/local/go/src/net/http/server.go:2695 +0x1b2
net/http.(*Server).ListenAndServe(0xc42007a4e0, 0xc42007a4e0, 0xc420038f00)
    /usr/local/go/src/net/http/server.go:2636 +0xa9
net/http.ListenAndServe(0x6ff124, 0x5, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/http/server.go:2882 +0x7f
main.main()
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:239 +0x15b

goroutine 21 [runnable]:
internal/poll.runtime_pollWait(0x7f03114c5eb0, 0x72, 0x0)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422722198, 0x72, 0xffffffffffffff00, 0x854d00, 0x851550)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422722198, 0xc4227ab000, 0x1000, 0x1000)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Read(0xc422722180, 0xc4227ab000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:125 +0x18a
net.(*netFD).Read(0xc422722180, 0xc4227ab000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc4227a6000, 0xc4227ab000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc4227ae000)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc4227ae000, 0x7f03114c5e0a, 0x0, 0x459fd6, 0xc42276db60, 0x491d6d, 0xc422722180)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc4227b0000, 0x0, 0x8000000000000000, 0xc422722180, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc4227b0000, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc4227b0000, 0x0, 0x6ff70a, 0x7, 0xc422800d40, 0x2, 0x2, 0x68f280, 0x696f01, 0xc422800d60, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc4227b0000, 0x6ff70a, 0x7, 0xc422800d40, 0x2, 0x2, 0x0, 0xc4227ba9a0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 22 [IO wait]:
internal/poll.runtime_pollWait(0x7f03114c5c70, 0x72, 0x0)
    /usr/local/go/src/runtime/netpoll.go:173 +0x57
internal/poll.(*pollDesc).wait(0xc422758098, 0x72, 0xffffffffffffff00, 0x854d00, 0x851550)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:85 +0xae
internal/poll.(*pollDesc).waitRead(0xc422758098, 0xc422774000, 0x1000, 0x1000)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:90 +0x3d
internal/poll.(*FD).Read(0xc422758080, 0xc422774000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:125 +0x18a
net.(*netFD).Read(0xc422758080, 0xc422774000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc42274c020, 0xc422774000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc422748120)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc422748120, 0x7f03114c5c0a, 0x0, 0x459fd6, 0xc420049b60, 0x491d6d, 0xc422758080)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc420084820, 0x0, 0x8000000000000000, 0xc422758080, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc420084820, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc420084820, 0x0, 0x6ff70a, 0x7, 0xc422800dc0, 0x2, 0x2, 0x68f280, 0x696f01, 0xc422800de0, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc420084820, 0x6ff70a, 0x7, 0xc422800dc0, 0x2, 0x2, 0x0, 0xc4227ba9b0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 23 [runnable]:
syscall.Syscall(0x0, 0x7, 0xc42277a000, 0x1000, 0x4, 0x1000, 0x0)
    /usr/local/go/src/syscall/asm_linux_amd64.s:18 +0x5
syscall.read(0x7, 0xc42277a000, 0x1000, 0x1000, 0xc42276f900, 0x0, 0x0)
    /usr/local/go/src/syscall/zsyscall_linux_amd64.go:756 +0x55
syscall.Read(0x7, 0xc42277a000, 0x1000, 0x1000, 0xcb, 0xc4228072b0, 0xcb)
    /usr/local/go/src/syscall/syscall_unix.go:162 +0x49
internal/poll.(*FD).Read(0xc422722280, 0xc42277a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:121 +0x125
net.(*netFD).Read(0xc422722280, 0xc42277a000, 0x1000, 0x1000, 0x4815d4, 0x47fd05, 0x1)
    /usr/local/go/src/net/fd_unix.go:202 +0x52
net.(*conn).Read(0xc42274c060, 0xc42277a000, 0x1000, 0x1000, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:176 +0x6d
bufio.(*Reader).fill(0xc4227481e0)
    /usr/local/go/src/bufio/bufio.go:97 +0x11a
bufio.(*Reader).ReadSlice(0xc4227481e0, 0x7f03114c5d0a, 0x0, 0x459fd6, 0xc42276fb60, 0x491d6d, 0xc422722280)
    /usr/local/go/src/bufio/bufio.go:338 +0x2c
github.com/gomodule/redigo/redis.(*conn).readLine(0xc420084a00, 0x0, 0x8000000000000000, 0xc422722280, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:431 +0x38
github.com/gomodule/redigo/redis.(*conn).readReply(0xc420084a00, 0x0, 0x0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:504 +0x40
github.com/gomodule/redigo/redis.(*conn).DoWithTimeout(0xc420084a00, 0x0, 0x6ff70a, 0x7, 0xc4227f1900, 0x2, 0x2, 0x68f280, 0x696f01, 0xc4227f1920, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:665 +0x164
github.com/gomodule/redigo/redis.(*conn).Do(0xc420084a00, 0x6ff70a, 0x7, 0xc4227f1900, 0x2, 0x2, 0x0, 0xc4227318e0, 0x0, 0x0)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:616 +0x73
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:150 +0x569
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 24 [running]:
    goroutine running on other thread; stack unavailable
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

goroutine 25 [runnable]:
syscall.Syscall6(0x37, 0x9, 0x1, 0x4, 0xc42276b3dc, 0xc42276b3d8, 0x0, 0x0, 0x4, 0x0)
    /usr/local/go/src/syscall/asm_linux_amd64.s:44 +0x5
syscall.getsockopt(0x9, 0x1, 0x4, 0xc42276b3dc, 0xc42276b3d8, 0xc422722400, 0x0)
    /usr/local/go/src/syscall/zsyscall_linux_amd64.go:1605 +0x7c
syscall.GetsockoptInt(0x9, 0x1, 0x4, 0x1, 0x0, 0x0)
    /usr/local/go/src/syscall/syscall_unix.go:245 +0x63
net.(*netFD).connect(0xc422722480, 0x857a00, 0xc42001c0d8, 0x0, 0x0, 0x853a80, 0xc422764100, 0x0, 0x0, 0x0, ...)
    /usr/local/go/src/net/fd_unix.go:160 +0x2f7
net.(*netFD).dial(0xc422722480, 0x857a00, 0xc42001c0d8, 0x858c00, 0x0, 0x858c00, 0xc422736510, 0xc42276b610, 0x54d3fe)
    /usr/local/go/src/net/sock_posix.go:142 +0xe9
net.socket(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x2, 0x1, 0x0, 0x0, 0x858c00, 0x0, ...)
    /usr/local/go/src/net/sock_posix.go:93 +0x1a5
net.internetSocket(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x858c00, 0x0, 0x858c00, 0xc422736510, 0x1, 0x0, ...)
    /usr/local/go/src/net/ipsock_posix.go:141 +0x129
net.doDialTCP(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x0, 0xc422736510, 0xc42276b7e0, 0x0, 0xf2)
    /usr/local/go/src/net/tcpsock_posix.go:62 +0xb9
net.dialTCP(0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x0, 0xc422736510, 0x44a6b8, 0xad3bb25ed8, 0x2e6f67bd)
    /usr/local/go/src/net/tcpsock_posix.go:58 +0xe4
net.dialSingle(0x857a00, 0xc42001c0d8, 0xc422722400, 0x8559c0, 0xc422736510, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/dial.go:547 +0x3e2
net.dialSerial(0x857a00, 0xc42001c0d8, 0xc422722400, 0xc420010c80, 0x1, 0x1, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/dial.go:515 +0x247
net.(*Dialer).DialContext(0xc422766240, 0x857a00, 0xc42001c0d8, 0x6fee50, 0x3, 0x701517, 0xe, 0x0, 0x0, 0x0, ...)
    /usr/local/go/src/net/dial.go:397 +0x6ee
net.(*Dialer).Dial(0xc422766240, 0x6fee50, 0x3, 0x701517, 0xe, 0x10, 0x6b5de0, 0xc420082001, 0xc420010c40)
    /usr/local/go/src/net/dial.go:320 +0x75
net.(*Dialer).Dial-fm(0x6fee50, 0x3, 0x701517, 0xe, 0x3, 0xc42273cc50, 0x42a568, 0xc42273cc18)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:180 +0x52
github.com/gomodule/redigo/redis.Dial(0x6fee50, 0x3, 0x701517, 0xe, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
    /home/sam/repos/go/src/github.com/gomodule/redigo/redis/conn.go:183 +0x182
main.addRedisSender(0xc42005a180, 0xc42001e240)
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:133 +0x12e
created by main.RedisWriteHandler
    /home/sam/repos/go/src/github.com/samisagit/go-im-server/src/main.go:119 +0x381

rax    0x5
rbx    0xf3e31
rcx    0x5
rdx    0xc42005a180
rdi    0x458f01
rsi    0x3
rbp    0xc42004dfc8
rsp    0xc42004dee8
r8     0x0
r9     0x0
r10    0x732f60
r11    0x30
r12    0x0
r13    0xf1
r14    0x11
r15    0x0
rip    0x65f199
rflags 0x246
cs     0x33
fs     0x0
gs     0x0

Порядок рутины 22 несколько интересен, так как он в [IO WAIT], где остальные работают. Я не имел дело с этими статусами раньше, вот в чем проблема?

Самый актуальный код здесь https://gitlab.com/samisagit/go-im-server/blob/changed-redis-handler-buggy/src/main.go

Редактировать с первопричиной!

Один из парней из golang-nuts gGroup предположил, что это может быть проблемой GC, и он был прав (спасибо, Майкл)! При запуске кода с ниже

debug.SetGCPercent(-1)

это работает как ожидалось - это не долгосрочное решение, но оно указывает на то, где проблема лежит! Если у кого-то есть какие-либо подсказки относительно того, почему GC так усердствует со своими обязанностями, я был бы очень благодарен!

2 ответа

Для будущей ссылки это оказалось первым for зациклиться RedisWriteHandler, В цикле нет блокирующих строк (например, выбора), поэтому цикл выполняется "бесконечно" и использует нагрузку ресурсов, в результате чего GC отключает его. Думаю, что я разместил ответ здесь, когда я впервые нашел это, но, очевидно, нет.

Основываясь на этом

Из других тестов, которые я выполняю, я вижу, что это не просто медленно, а сообщения просто не берутся с канала.

Я думаю, что у вас есть живой замок addRedisSender()

Оператор select будет псевдослучайно выбирать один из случаев, либо killSwitch случай, или <-messageChannel, Кроме того, есть еще один случай, default, Это всегда будет правдой, то есть for { цикл будет вращаться для того, чтобы потреблять все ресурсы и вызывать живую блокировку, поскольку среда выполнения go пытается планировать все больше и больше конкурирующих процедур go.

Если вы удалите default: continue В этом случае цикл for будет блокировать выбор, пока не появится сообщение для чтения или сигнал уничтожения.

Другие вопросы по тегам