Kafka Streams: Частичная переработка по ключу

Сценарий:

В сценарии веб- сеанса KafkaStreams с неограниченным (или многолетним) хранением, с интерактивными запросами (это может быть просмотрено при необходимости), со многими клиентами, каждый из которых имеет много пользователей (каждый пользователь относится к каждому клиенту) и где происходит разбиение выглядит так:

Разделение по функции (clientId, userId)% numberOfPartitions, предварительно устанавливая numberOfPartitions в зависимости от размера кластера. Это позволило бы выполнять сеанс для данных (clientId, userId) и должно обеспечивать равномерное распределение данных между узлами (отсутствие горячей точки, размера раздела или загрузки записи).

Тем не менее, при запросе я бы запросил по клиенту (и диапазон времени). Итак, я бы собрал агрегированный Ktable из этой таблицы Sessions, где ключом является клиент, а Sessions запрашиваются (client, timeStart, timeEnd). Это привело бы к тому, что эти данные от клиента должны были попасть в один узел, что могло бы вызвать проблемы с масштабируемостью (слишком большой клиент), но, поскольку данные уже агрегированы, я думаю, что это будет управляемым.

Вопрос:

В этом сценарии (варианты оценены), я хотел бы иметь возможность повторной обработки только для одного клиента.

Но данные от одного клиента будут распределены среди (потенциально всех) разделов.

Каким образом можно добиться частичной повторной обработки в потоках Kafka с минимальным воздействием и в то же время сохранить (старое) состояние запрашиваемым?

1 ответ

В общем, я думаю, что вы уже знаете ответ на свой вопрос. При такой схеме разбиения, которую вы описали, вам придется читать все разделы, если вы хотите повторно обработать клиента, поскольку сообщения будут распределены по всем из них.

Единственное, что я могу придумать, чтобы ограничить объем накладных расходов при повторной обработке всего клиента, - это реализовать схему разделения, которая группирует несколько разделов для клиента, а затем распределяет пользователей по этим разделам, чтобы избежать перегрузки одного раздела с особенно "большим" "клиент. Надеюсь, картина должна прояснить то, что я, вероятно, не смог объяснить словами.

Группировка разделов по клиенту

Пользовательский разделитель для достижения этого распределения может выглядеть примерно так, как показано в следующем коде. Пожалуйста, возьмите это с крошкой соли, пока это чисто теоретически и никогда не проверялось (или даже работало в этом отношении) - но это должно иллюстрировать принцип.

public class ClientUserPartitioner implements Partitioner {
int partitionGroupSize = 10;

@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    // For this we expect the key to be of the format "client/user"
    String[] splitValues = ((String)key).split("/");
    String client = splitValues[0];
    String user = splitValues[1];

    // Check that partitioncount is divisible by group size
    if (cluster.availablePartitionsForTopic(topic).size() % partitionGroupSize != 0) {
        throw new ConfigException("Partitioncount must be divisible by "+ partitionGroupSize +" for this partitioner but is " +
                cluster.availablePartitionsForTopic(topic).size() + " for topic " + topic);
    }

    // Calculate partition group from client and specific partition from user
    int clientPartitionOffset = Utils.murmur2(client.getBytes()) % partitionGroupSize * partitionGroupSize;
    int userPartition = Utils.murmur2(user.getBytes()) % partitionGroupSize;

    // Combine group and specific value to get final partition
    return clientPartitionOffset + userPartition;
}

@Override
public void configure(Map<String, ?> configs) {
    if (configs.containsKey("partition.group.size")) {
        this.partitionGroupSize = Integer.parseInt((String)configs.get("partition.group.size"));
    }
}

@Override
public void close() {
}

}

Это, конечно, повлияет на ваш дистрибутив, возможно, стоит потратить некоторое время на моделирование с различными значениями для partitionGroupSize и репрезентативной выборки ваших данных, чтобы оценить, насколько равномерным является распределение и какие накладные расходы вы бы сэкономили при повторной обработке весь клиент.

Другие вопросы по тегам