Может ли потребитель читать записи из раздела, в котором хранятся данные с определенным значением ключа?

Вместо того, чтобы создавать много тем, я создаю раздел для каждого потребителя и храню данные, используя ключ. Итак, есть ли способ заставить потребителя в группе потребителей читать из раздела, в котором хранятся данные определенного ключа. Если да, то можете ли вы подсказать, как это можно сделать с помощью kafka-python (или любой другой библиотеки).

2 ответа

Я считаю, что то, что вы пытаетесь достичь, не является лучшей практикой в ​​долгосрочной перспективе.

Если я понял, вам нужно определить раздел, к которому будет подключаться потребитель, на основе ключа сообщения.

Я предполагаю, что издатели используют "разделитель по умолчанию".

Технически вы могли бы определить раздел темы, повторно используя в потребителе тот же алгоритм, что и в получателе. Здесь Java-код DefaultPartitioner. Вы можете адаптировать его в Python.

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (keyBytes == null) {
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (availablePartitions.size() > 0) {
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            // no partitions are available, give a non-available partition
            return Utils.toPositive(nextValue) % numPartitions;
        }
    } else {
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.get(topic);
    if (null == counter) {
        counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
        AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
        if (currentCounter != null) {
            counter = currentCounter;
        }
    }
    return counter.getAndIncrement();
}

Важная часть в вашем случае использования, когда ключ установлен:

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

И Utils.murmur2 метод:

public static int murmur2(final byte[] data) {
    int length = data.length;
    int seed = 0x9747b28c;
    // 'm' and 'r' are mixing constants generated offline.
    // They're not really 'magic', they just happen to work well.
    final int m = 0x5bd1e995;
    final int r = 24;

    // Initialize the hash to a random value
    int h = seed ^ length;
    int length4 = length / 4;

    for (int i = 0; i < length4; i++) {
        final int i4 = i * 4;
        int k = (data[i4 + 0] & 0xff) + ((data[i4 + 1] & 0xff) << 8) + ((data[i4 + 2] & 0xff) << 16) + ((data[i4 + 3] & 0xff) << 24);
        k *= m;
        k ^= k >>> r;
        k *= m;
        h *= m;
        h ^= k;
    }

    // Handle the last few bytes of the input array
    switch (length % 4) {
        case 3:
            h ^= (data[(length & ~3) + 2] & 0xff) << 16;
        case 2:
            h ^= (data[(length & ~3) + 1] & 0xff) << 8;
        case 1:
            h ^= data[length & ~3] & 0xff;
            h *= m;
    }

    h ^= h >>> 13;
    h *= m;
    h ^= h >>> 15;

    return h;
}

Почему я думаю, что это не лучшее решение?

Если вы добавите новый раздел в вашу тему, DefaultPartitioner предоставит вам partition id это может отличаться от partition id вернулся, прежде чем вы добавили новый раздел. И по умолчанию существующие сообщения не перераспределяются, что означает, что у вас будут сообщения с одинаковым ключом в разных разделах.

И то же самое поведение происходит на стороне потребителя. После обновления количества разделов потребитель будет пытаться использовать сообщения из другого раздела. Вы пропустите сообщения из предыдущего раздела, используемого для этого ключа.

Вместо использования подписки и связанной логики групп потребителей вы можете использовать логику "назначить" (например, она предоставляется клиентским Java-клиентом Kafka). Хотя с подпиской на тему и являясь частью группы потребителей, разделы автоматически присваиваются потребителям и перебалансируются, когда новый потребитель присоединяется или уходит, это отличается от использования assign. С назначением, потребитель просит быть назначенным определенному разделу. Это не часть какой-либо группы потребителей. Это также означает, что вы отвечаете за перебалансировку, если потребитель умирает: например, если потребитель 1 получает назначенный раздел 1, но в какой-то момент происходит сбой, раздел 1 не будет автоматически переназначаться другому потребителю. Вы должны написать и обработать логику перезапуска потребителя (или другого) для получения сообщений из раздела 1.

Другие вопросы по тегам