Golang Kafka не использует все сообщения
Первый пакет:- Я пытаюсь извлечь данные из 100 плоских файлов и загрузить их в массив и вставить их в производитель kafka один за другим в виде байтового массива.
Вторая партия:- Я потребляю у потребителя kafka и затем вставляю их в базу данных NoSQL.
Я использую Offsetnewset в конфигурационном файле пакета shopify sarama golang для Kafka.
Я могу получать и вставлять сообщения в kafka, но при получении я получаю только первое сообщение. Так как я дал Offset новейший в конфиге сарама. как я могу получить все данные здесь.
1 ответ
Трудно быть в состоянии что-то сказать без какого-либо кода или более подробного объяснения о том, как настроен kafka (то есть: темы, разделы, ...), поэтому мне приходит в голову несколько быстрых проверок:
Предполагая, что вы начнете использовать набор OffsetNewest до того, как начнете создавать, одна вещь, которая может произойти, это то, что вы не потребляете все разделы по этой теме, относящиеся к документам sarama, вы должны явно потреблять каждый раздел, создавая PartitionConsumers. Из примера в https://godoc.org/github.com/Shopify/sarama:
partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest) if err != nil { panic(err) } ... consumed := 0 ConsumerLoop: for { select { case msg := <-partitionConsumer.Messages(): log.Printf("Consumed message offset %d\n", msg.Offset) consumed++ case <-signals: break ConsumerLoop } }
Фактически вы начинаете потреблять после создания всех событий, и поэтому указатель для их чтения - это не OffsetNewest, а OffsetOldest.
Я сожалею, что не могу дать вам более полезного ответа, но, возможно, если вы вставите какой-то код или предоставите больше деталей, мы можем помочь, но больше.