Почему я выдаю 65536 сообщений на кафку, но получаю только сотни?

Версия kafka: 1.0.0 версия sarama: 1.15.0 версия go: 1.9.1

Пример кода, подобный этому:

func main() {
    config := sarama.NewConfig()
    config.Version = sarama.V1_0_0_0
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = sarama.WaitForAll
    // config.Producer.Flush.Frequency = 10 * time.Second
    // config.Producer.Flush.Bytes = 1024 * 1024
    // config.Producer.Flush.MaxMessages = 1024
    producer, err := sarama.NewAsyncProducer(strings.Split(*brokers, ","), config)
    if err != nil {
        panic(err)
    }

    // Trap SIGINT to trigger a graceful shutdown.
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, os.Interrupt)

    var (
        wg                          sync.WaitGroup
        enqueued, successes, errors int
    )

    wg.Add(1)
    go func() {
        defer wg.Done()
        for range producer.Successes() {
            successes++
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        for err := range producer.Errors() {
            log.Println(err)
            errors++
        }
    }()
    counter := 0

ProducerLoop:
    for {
        if counter >= 65536 {
            producer.AsyncClose() // Trigger a shutdown of the producer.
            break ProducerLoop
        }
        message := &sarama.ProducerMessage{
            Topic: *topics,
            // Key:       sarama.StringEncoder(fmt.Sprintf("%d", counter)),
            // Partition: int32(counter),
            Value: sarama.StringEncoder(fmt.Sprintf("%d,%d", counter, time.Now().UnixNano())),
            // Timestamp: time.Now(),
        }
        select {
        case producer.Input() <- message:
            enqueued++

        case <-signals:
            producer.AsyncClose() // Trigger a shutdown of the producer.
            break ProducerLoop
        }
        if *verbose {
            fmt.Printf(".")
        }
        if *sleep {
            // fmt.Println(100 * time.Millisecond)
            time.Sleep(1 * time.Millisecond)
        }
        counter++
    }

    wg.Wait()

    log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
}

из журнала я нашел 65536 сообщений, отправленных на kafka, но когда я использовал с помощью официального клиента-клиента kafka, получил только сто сообщений, я был совершенно сбит с толку

Я использовал официальный потребительский инструмент для этого:

$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server $KAFKA_BROKERS --topic fire-8  --from-beginning

что касается смещений и разделов при их создании, я тоже их записал. Это слишком долго, чтобы вставить здесь. Подсчет совпадений, я не нашел ничего странного

Любые объяснения приветствуются.

Я также опубликовал вопрос на GitHub.

обновление 1

Я обнаружил, что если я засну, разрыв между потребляемым и произведенным сократится.

обновление 2

Я нашел данные, которые я отправляю в журнале, но я не могу их потреблять

Update3

Я повторяю шаги выше на старом кластере kafka (0.10.1.0), все работает точно так, как ожидалось

1 ответ

Это должна быть ошибка клиента kafka go: sarama. Я нашел решение от проблемы GitHub, которую я отправил: удалите конфигурацию версии kafka, и это работает. Но это потеряло некоторые новые функции, принесенные новой версией. Если вы не указали используемую версию, по умолчанию используется самая старая

Другие вопросы по тегам