Kafka Streams: Магазин не готов
Недавно мы обновили Kafka до v1.1 и Confluent до v4.0. Но после обновления мы столкнулись с постоянными проблемами, касающимися государственных хранилищ. Наше приложение запускает коллекцию потоков, и мы проверяем, готовы ли хранилища состояний, прежде чем уничтожать приложение после 100 попыток. Но после обновления есть по крайней мере один поток, который будет иметь Store is not ready : the state store, <your stream>, may have migrated to another instance
Сам поток имеет RUNNING
состояние и сообщения будут проходить через, но состояние хранилища по-прежнему отображается как не готово. Поэтому я понятия не имею, что может происходить.
- Разве я не должен проверять состояние магазина?
- А так как в нашем приложении много потоков (~15), вызовет ли их одновременный запуск проблемы?
- Если мы не делаем жесткий перезапуск - в настоящее время мы запускаем его как службу на Linux
Мы запускаем Kafka в кластере с 3 брокерами. Ниже приведен пример потока (не весь код):
public BaseStream createStreamInstance() {
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
MessagePayLoadParser<Note> noteParser = new MessagePayLoadParser<Note>(Note.class);
GenericJsonSerde<Note> noteSerde = new GenericJsonSerde<Note>(Note.class);
StreamsBuilder builder = new StreamsBuilder();
//below reducer will use sets to combine
//value1 in the reducer is what is already present in the store.
//value2 is the incoming message and for notes should have max 1 item in it's list (since its 1 attachment 1 tag per row, but multiple rows per note)
Reducer<Note> reducer = new Reducer<Note>() {
@Override
public Note apply(Note value1, Note value2) {
value1.merge(value2);
return value1;
}
};
KTable<Long, Note> noteTable = builder
.stream(this.subTopic, Consumed.with(jsonSerde, jsonSerde))
.map(noteParser::parse)
.groupByKey(Serialized.with(Serdes.Long(), noteSerde))
.reduce(reducer);
noteTable.toStream().to(this.pubTopic, Produced.with(Serdes.Long(), noteSerde));
this.stream = new KafkaStreams(builder.build(), this.properties);
return this;
}
0 ответов
Здесь есть несколько открытых вопросов, таких как те, которые оставил Матиас в комментариях, но он постарается ответить / помочь на ваши актуальные вопросы:
- Не проверять состояние магазина? Здесь обычно происходит ребалансировка. Но в этом случае вы не должны видеть, что поток раздела продолжает потреблять, но эту обработку следует "передать" другому потоку, который взял на себя управление. Убедитесь, что именно этот поток обрабатывает этот раздел, а не новый. Проверьте утилиту kafka-consumer-groups, чтобы следить за потребителями (потоками) там.
- А поскольку в нашем приложении много потоков (~15), вызовет ли их одновременный запуск проблемы? Нет, ребалансировка выполняется автоматически.
- Не следует ли нам делать полный перезапуск - в настоящее время мы запускаем его как службу в Linux. Храните ли вы свои хранилища состояний в определенном каталоге, отличном от каталога по умолчанию? Вы должны правильно настроить каталог хранилищ состояний и убедиться, что он доступен и нечувствителен к перезапускам приложений. Не уверены в том, как вы выполняете жесткий перезапуск, но некоторый код обработки исключений должен противодействовать этому, закрывая приложение потоков.