Несколько тем и их приоритет
Я использую pykafka для потребления сообщения, и теперь я используюалансированный_консумер для потребления сообщения из одной темы. Теперь я должен потреблять сообщения из другой темы, и если это возможно, приоритетное потребление сообщений из разных тем. Как я могу справиться с этой проблемой? Может быть другая библиотека для питона?
1 ответ
Я только что опубликовал пост об этой проблеме.
Несмотря на то, что я использую Java, вы можете найти описанную здесь концепцию полезной для вашего случая.
То, что мы сделали для решения вопроса приоритезации тем Кафки, -
Мы разработали механизм приоритетов потребления тем Кафки. Такой механизм будет проверять, хотим ли мы обработать сообщение, которое было получено от Kafka, или приостановить обработку на потом.
Мы отобразили между разделами и логическими значениями, что при необходимости блокирует использование каждого раздела topicPartitionLocks. Блокировка предварительных, продолжая поглощать запоздалые, создает приоритетность тем. TimerTask обновляет эту карту, и наши потребители проверяют, "разрешено" ли им потреблять или ждать - как вы можете видеть в методе waitForLatePartitionIfNeeded.
public class Prioritizer extends TimerTask {
private Map<String, Boolean> topicPartitionLocks = new ConcurrentHashMap<>();
private Map<String, Long> topicPartitionLatestTimestamps = new ConcurrentHashMap<>();
@Override
public void run(){
updateTopicPartitionLocks();
}
private void updateTopicPartitionLocks() {
Optional<Long> minValue = topicPartitionLatestTimestamps.values().stream().min((o1, o2) -> (int) (o1 - o2));
if(! minValue.isPresent()) {
return;
}
Iterator it = topicPartitionLatestTimestamps.entrySet().iterator();
while (it.hasNext()) {
Boolean shouldLock = false;
Map.Entry<String, Long> pair = (Map.Entry)it.next();
String topicPartition = pair.getKey();
if(pair.getValue() > (minValue.get() + maxGap)) {
shouldLock = true;
if(isSameTopicAsMinPartition(minValue.get(), topicPartition)) {
shouldLock = false;
}
}
topicPartitionLocks.put(topicPartition, shouldLock);
}
}
public boolean isLocked(String topicPartition) {
return topicPartitionLocks.get(topicPartition).booleanValue();
}
}
метод waitForLatePartitionIfNeeded
private void waitForLatePartitionIfNeeded(final String topic, int partition) {
String topicPartition = topic + partition;
prioritizer.getTopicPartitionLocks.putIfAbsent(topicPartition);
while(prioritizer.isLocked(topicPartition)) {
monitorWaitForLatePartitionTimes(topicPartition, startTime);
Misc.sleep(timeToWaitBetweenGapToTardyPartitionChecks.get());
}
}
Используя это, мы увеличили баланс, поэтому решили это с помощью следующих определений:
Мы изменили следующую конфигурацию в Кафке
request.timeout.ms: 7300000 (~2hrs)
max.poll.interval.ms: 7200000 (2hrs)
Для графиков и общих описаний по проблеме вы можете проверить мой пост:
Как я решил задержки в сообщениях Kafka, расставив приоритеты в темах Kafka
Удачи!