StackExchange.Redis - использование LockTake / LockRelease
Я использую Redis с StackExchange.Redis. У меня есть несколько потоков, которые в какой-то момент получают доступ и редактируют значение одного и того же ключа, поэтому мне нужно синхронизировать манипуляции с данными.
Глядя на доступные функции, я вижу, что есть две функции, TakeLock и ReleaseLock. Однако эти функции принимают как ключ, так и параметр значения, а не ожидаемую блокировку одного ключа. В документации и источнике intellisene на GitHub не объясняется, как использовать функции LockTake и LockRelease или что передавать для параметров ключа и значения.
Вопрос: Как правильно использовать LockTake и LockRelease в StackExchange.Redis?
Пример псевдокода того, что я собираюсь сделать:
//Add Items Before Parallel Execution
redis.StringSet("myJSONKey", myJSON);
//Parallel Execution
Parallel.For(0, 100, i =>
{
//Some work here
//....
//Lock
redis.LockTake("myJSONKey");
//Manipulate
var myJSONObject = redis.StringGet("myJSONKey");
myJSONObject.Total++;
Console.WriteLine(myJSONObject.Total);
redis.StringSet("myJSONKey", myNewJSON);
//Unlock
redis.LockRelease("myJSONKey");
//More work here
//...
});
2 ответа
В замке 3 части:
- ключ (уникальное имя замка в базе данных)
- значение (определяемый вызывающим токен токен, который может использоваться как для указания того, кто "владеет" блокировкой, так и для проверки правильности снятия и расширения блокировки)
- длительность (блокировка намеренно ограничена)
Если никакое другое значение не приходит на ум, гид может сделать подходящее "значение". Мы склонны использовать имя машины (или поддельную версию имени машины, если на одном компьютере могут конкурировать несколько процессов).
Также обратите внимание, что взятие блокировки является спекулятивным, а не блокирующим. Вполне возможно, что вам не удалось получить блокировку, и, следовательно, вам может потребоваться проверить это и, возможно, добавить некоторую логику повторных попыток.
Типичным примером может быть:
RedisValue token = Environment.MachineName;
if(db.LockTake(key, token, duration)) {
try {
// you have the lock do work
} finally {
db.LockRelease(key, token);
}
}
Обратите внимание, что если работа занимает много времени (в частности, цикл), вы можете добавить некоторые случайные LockExtend
звонки посередине - снова не забывая проверять на успешность (в случае истечения времени ожидания).
Также обратите внимание, что все отдельные команды redis являются атомарными, поэтому вам не нужно беспокоиться о двух конкурирующих операциях. Для более сложных многооперационных модулей, транзакции и сценарии являются опциями.
Есть моя часть кода для блокировки -> получить -> изменить (если требуется)-> разблокировать действия с комментариями.
public static T GetCachedAndModifyWithLock<T>(string key, Func<T> retrieveDataFunc, TimeSpan timeExpiration, Func<T, bool> modifyEntityFunc,
TimeSpan? lockTimeout = null, bool isSlidingExpiration=false) where T : class
{
int lockCounter = 0;//for logging in case when too many locks per key
Exception logException = null;
var cache = Connection.GetDatabase();
var lockToken = Guid.NewGuid().ToString(); //unique token for current part of code
var lockName = key + "_lock"; //unique lock name. key-relative.
T tResult = null;
while ( lockCounter < 20)
{
//check for access to cache object, trying to lock it
if (!cache.LockTake(lockName, lockToken, lockTimeout ?? TimeSpan.FromSeconds(10)))
{
lockCounter++;
Thread.Sleep(100); //sleep for 100 milliseconds for next lock try. you can play with that
continue;
}
try
{
RedisValue result = RedisValue.Null;
if (isSlidingExpiration)
{
//in case of sliding expiration - get object with expiry time
var exp = cache.StringGetWithExpiry(key);
//check ttl.
if (exp.Expiry.HasValue && exp.Expiry.Value.TotalSeconds >= 0)
{
//get only if not expired
result = exp.Value;
}
}
else //in absolute expiration case simply get
{
result = cache.StringGet(key);
}
//"REDIS_NULL" is for cases when our retrieveDataFunc function returning null (we cannot store null in redis, but can store pre-defined string :) )
if (result.HasValue && result == "REDIS_NULL") return null;
//in case when cache is epmty
if (!result.HasValue)
{
//retrieving data from caller function (from db from example)
tResult = retrieveDataFunc();
if (tResult != null)
{
//trying to modify that entity. if caller modifyEntityFunc returns true, it means that caller wants to resave modified entity.
if (modifyEntityFunc(tResult))
{
//json serialization
var json = JsonConvert.SerializeObject(tResult);
cache.StringSet(key, json, timeExpiration);
}
}
else
{
//save pre-defined string in case if source-value is null.
cache.StringSet(key, "REDIS_NULL", timeExpiration);
}
}
else
{
//retrieve from cache and serialize to required object
tResult = JsonConvert.DeserializeObject<T>(result);
//trying to modify
if (modifyEntityFunc(tResult))
{
//and save if required
var json = JsonConvert.SerializeObject(tResult);
cache.StringSet(key, json, timeExpiration);
}
}
//refresh exiration in case of sliding expiration flag
if(isSlidingExpiration)
cache.KeyExpire(key, timeExpiration);
}
catch (Exception ex)
{
logException = ex;
}
finally
{
cache.LockRelease(lockName, lockToken);
}
break;
}
if (lockCounter >= 20 || logException!=null)
{
//log it
}
return tResult;
}
и использование:
public class User
{
public int ViewCount { get; set; }
}
var cachedAndModifiedItem = GetCachedAndModifyWithLock<User>( "MyAwesomeKey", () =>
{
//return from db or kind of that
return new User() { ViewCount = 0 };
}, TimeSpan.FromMinutes(10), user=>
{
if (user.ViewCount< 3)
{
user.ViewCount++;
return true; //save it to cache
}
return false; //do not update it in cache
}, TimeSpan.FromSeconds(10),true);
Этот код может быть улучшен (например, вы можете добавлять транзакции для меньшего количества вызовов в кэш и т. Д.), Но я рад, что это будет полезно для вас.