Коллекция redis атомная lpop все

Redis список, производитель держит lpush. В другом потоке потребители периодически вынимают все из списка и классифицируют элементы. Поскольку продюсер продолжает настаивать, то все усилия должны быть сделаны атомарно. Так есть ли эффективный способ сделать это? spring-data-redis может быть использован.

// producer
getOpsForList.push(k, v);

// consumer
alist = range(k,0,-1); // take all out
alist.parallelStream() // during which a producer thread could push but I hope it is "blocked".
delete(k);  // list is now empty and push from producer is unblocked.

multi а также exec не достигает моей цели, потому что на самом деле подчиняется lrange, lpush а также delete только в одной транзакции. Пока что я могу думать только о том, чтобы lpop и добавить вернулся в alist пока список не пуст.

РЕДАКТИРОВАТЬ, это то, что я думаю: когда вы хотите убедиться, что операция выполняется только один раз, используя watch:

watch key
val = get key
val = val + 1
multi
set key val
exec

когда вы хотите, чтобы вас не "прерывали" (не прерывали многопоточность), и не заботились о том, сколько раз он выполняется, транзакция (multi а также exec) достаточно.

multi
val = lrange key 0 -1
delete key
exec

val все еще список после того, как он заканчивается, как то, что сказано в официальном документе

Все команды в транзакции сериализуются и выполняются последовательно. Никогда не может случиться, что запрос, выданный другим клиентом, будет обработан в середине выполнения транзакции Redis.

Помимо Redis, я взял операцию данных list.stream.parallelism out, и функция теперь фокусируется только на сборщике данных, который в точности похож на последний абзац кода.;)

1 ответ

Решение

Хороший пример, чтобы проиллюстрировать, как WATCH можно использовать для создания новых атомарных операций, в противном случае Redis не поддерживается, - это реализация ZPOP, то есть команды, которая извлекает элемент с более низким счетом из отсортированного набора атомарным способом.

Есть реализация для ZPOP в документации, как показано ниже:

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

Что вам нужно сделать, это повторить вышеописанную операцию, если EXEC не удается (то есть возвращает пустой ответ). операция производителя lpush является атомарным, поэтому не нужно использовать команду watch. например:

// consumer pesudo code
do {
  watch(k);
  transaction = multi();
  alist = transaction.range(k,0,-1); 
  transaction.delete(k);  
  status = get status of transaction.exec();
} while(status == null);

alist.parallelStream() 
Другие вопросы по тегам