gocql блокирует слишком много одновременных запросов на чтение (golang, Cassandra)

Используя GOCQL (Golang, Cassandra), я делаю до 128 запросов, а потом все зависает. Я верю в функцию getTicksForCassandraKey(), которую я правильно выпускаю, но не уверен. GOCQL поддерживает только до 128 одновременных запросов, поэтому я должен что-то делать не так. Все запросы читаются.

Основной код:

inboundChannel := make(chan []bson.M, 30)
maxGoRoutinesCount := 30
chunkSize := int(math.Floor(float64(len(cassandraKeys)) / float64(maxGoRoutinesCount)))
log.Println("Chunk size will be: ", chunkSize)
log.Println("Cassandra Keys length: ", cassandraKeys)
idx := 0
for idx < len(cassandraKeys) {
    log.Println("idx: ", idx)
    chunkOfKeys := cassandraKeys[idx:(idx + chunkSize)]
    idx += chunkSize
    go func(keys []string) {
        log.Println("Received analysisKey on outbound channel. About to query this many keys: ", len(keys))
        //Cassandra session can handle up to 128 concurrent queries
        for _, c := range keys {
            processedTicks, err := getTicksForCassandraKey(*session, c, startTime)
            if err != nil {
                log.Println("Error returning.")
                return
            }
            log.Println("Finished query. About to post to inboundChannel for key: ", c)
            inboundChannel <- processedTicks
        }
    }(chunkOfKeys)
}

processedIndex := 0
for processedTicks := range inboundChannel {
    ticks = append(ticks, processedTicks...)
    log.Println("Got processed ticks from inboundChannel: ", processedIndex)
    processedIndex += 1
}
log.Println("End of function.")

Код для getTicksForCassandraKey:

func getTicksForCassandraKey(cassandraSession gocql.Session, cassandraAnalysisKey string, startTime time.Time) (ticks []bson.M, err error) {
    log.Println("getTicksForCassandraKey: ", cassandraAnalysisKey)
    cassandraQuery := "select * from analysisdata where analysis_key = '" + cassandraAnalysisKey + "' and time > ? ALLOW FILTERING;"
    q := cassandraSession.Query(cassandraQuery, startTime)
    iter := q.Iter()
    var rawData string
    var rawAnalysisKey string
    var rawTime time.Time
    for iter.Scan(&rawAnalysisKey, &rawTime, &rawData) {
        processedAlgoTick, processingErr := processAlgoTick(rawAnalysisKey, rawTime, rawData)
        if processingErr != nil {
            err = processingErr
            return
        }
        ticks = append(ticks, processedAlgoTick)
    }
    iterCloseErr := iter.Close()
    q.Release()
    log.Println("Closed iter for analysis key: ", cassandraAnalysisKey)
    if iterCloseErr != nil {
        log.Println("Cassandra Iterator.Close() Error:", iterCloseErr)
    }
    return
}

1 ответ

Входной канал в основном коде блокировался. Я поместил это в подпрограмму go и использовал sync.WaitGroup() для ее решения.

Другие вопросы по тегам