Golang Kafka не использует все сообщения

Первый пакет:- Я пытаюсь извлечь данные из 100 плоских файлов и загрузить их в массив и вставить их в производитель kafka один за другим в виде байтового массива.

Вторая партия:- Я потребляю у потребителя kafka и затем вставляю их в базу данных NoSQL.

Я использую Offsetnewset в конфигурационном файле пакета shopify sarama golang для Kafka.

Я могу получать и вставлять сообщения в kafka, но при получении я получаю только первое сообщение. Так как я дал Offset новейший в конфиге сарама. как я могу получить все данные здесь.

1 ответ

Трудно быть в состоянии что-то сказать без какого-либо кода или более подробного объяснения о том, как настроен kafka (то есть: темы, разделы, ...), поэтому мне приходит в голову несколько быстрых проверок:

  1. Предполагая, что вы начнете использовать набор OffsetNewest до того, как начнете создавать, одна вещь, которая может произойти, это то, что вы не потребляете все разделы по этой теме, относящиеся к документам sarama, вы должны явно потреблять каждый раздел, создавая PartitionConsumers. Из примера в https://godoc.org/github.com/Shopify/sarama:

    partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
    if err != nil {
        panic(err)
    }
    
    ...
    
    consumed := 0
    ConsumerLoop:
    for {
        select {
        case msg := <-partitionConsumer.Messages():
            log.Printf("Consumed message offset %d\n", msg.Offset)
            consumed++
        case <-signals:
            break ConsumerLoop
        }
    }
    
  2. Фактически вы начинаете потреблять после создания всех событий, и поэтому указатель для их чтения - это не OffsetNewest, а OffsetOldest.

Я сожалею, что не могу дать вам более полезного ответа, но, возможно, если вы вставите какой-то код или предоставите больше деталей, мы можем помочь, но больше.

Другие вопросы по тегам