StackExchange.Redis - использование LockTake / LockRelease

Я использую Redis с StackExchange.Redis. У меня есть несколько потоков, которые в какой-то момент получают доступ и редактируют значение одного и того же ключа, поэтому мне нужно синхронизировать манипуляции с данными.

Глядя на доступные функции, я вижу, что есть две функции, TakeLock и ReleaseLock. Однако эти функции принимают как ключ, так и параметр значения, а не ожидаемую блокировку одного ключа. В документации и источнике intellisene на GitHub не объясняется, как использовать функции LockTake и LockRelease или что передавать для параметров ключа и значения.

Вопрос: Как правильно использовать LockTake и LockRelease в StackExchange.Redis?

Пример псевдокода того, что я собираюсь сделать:

//Add Items Before Parallel Execution
redis.StringSet("myJSONKey", myJSON);

//Parallel Execution
Parallel.For(0, 100, i =>
    {
        //Some work here
        //....

        //Lock
        redis.LockTake("myJSONKey");

        //Manipulate
        var myJSONObject = redis.StringGet("myJSONKey");
        myJSONObject.Total++;
        Console.WriteLine(myJSONObject.Total);
        redis.StringSet("myJSONKey", myNewJSON);

        //Unlock
        redis.LockRelease("myJSONKey");

        //More work here
        //...
    });

2 ответа

Решение

В замке 3 части:

  • ключ (уникальное имя замка в базе данных)
  • значение (определяемый вызывающим токен токен, который может использоваться как для указания того, кто "владеет" блокировкой, так и для проверки правильности снятия и расширения блокировки)
  • длительность (блокировка намеренно ограничена)

Если никакое другое значение не приходит на ум, гид может сделать подходящее "значение". Мы склонны использовать имя машины (или поддельную версию имени машины, если на одном компьютере могут конкурировать несколько процессов).

Также обратите внимание, что взятие блокировки является спекулятивным, а не блокирующим. Вполне возможно, что вам не удалось получить блокировку, и, следовательно, вам может потребоваться проверить это и, возможно, добавить некоторую логику повторных попыток.

Типичным примером может быть:

RedisValue token = Environment.MachineName;
if(db.LockTake(key, token, duration)) {
    try {
        // you have the lock do work
    } finally {
        db.LockRelease(key, token);
    }
}

Обратите внимание, что если работа занимает много времени (в частности, цикл), вы можете добавить некоторые случайные LockExtend звонки посередине - снова не забывая проверять на успешность (в случае истечения времени ожидания).

Также обратите внимание, что все отдельные команды redis являются атомарными, поэтому вам не нужно беспокоиться о двух конкурирующих операциях. Для более сложных многооперационных модулей, транзакции и сценарии являются опциями.

Есть моя часть кода для блокировки -> получить -> изменить (если требуется)-> разблокировать действия с комментариями.

    public static T GetCachedAndModifyWithLock<T>(string key, Func<T> retrieveDataFunc, TimeSpan timeExpiration, Func<T, bool> modifyEntityFunc,
       TimeSpan? lockTimeout = null, bool isSlidingExpiration=false) where T : class
    {

        int lockCounter = 0;//for logging in case when too many locks per key
        Exception logException = null;

        var cache = Connection.GetDatabase();
        var lockToken = Guid.NewGuid().ToString(); //unique token for current part of code
        var lockName = key + "_lock"; //unique lock name. key-relative.
        T tResult = null;

        while ( lockCounter < 20)
        {
            //check for access to cache object, trying to lock it
            if (!cache.LockTake(lockName, lockToken, lockTimeout ?? TimeSpan.FromSeconds(10)))
            {
                lockCounter++;
                Thread.Sleep(100); //sleep for 100 milliseconds for next lock try. you can play with that
                continue;
            }

            try
            {
                RedisValue result = RedisValue.Null;

                if (isSlidingExpiration)
                {
                    //in case of sliding expiration - get object with expiry time
                    var exp = cache.StringGetWithExpiry(key);

                    //check ttl.
                    if (exp.Expiry.HasValue && exp.Expiry.Value.TotalSeconds >= 0)
                    {
                        //get only if not expired
                        result = exp.Value;
                    }
                }
                else //in absolute expiration case simply get
                {
                    result = cache.StringGet(key);
                }

                //"REDIS_NULL" is for cases when our retrieveDataFunc function returning null (we cannot store null in redis, but can store pre-defined string :) )
                if (result.HasValue && result == "REDIS_NULL") return null;
                //in case when cache is epmty
                if (!result.HasValue)
                {
                    //retrieving data from caller function (from db from example)
                    tResult = retrieveDataFunc();

                    if (tResult != null)
                    {
                        //trying to modify that entity. if caller modifyEntityFunc returns true, it means that caller wants to resave modified entity.
                        if (modifyEntityFunc(tResult))
                        {
                            //json serialization
                            var json = JsonConvert.SerializeObject(tResult);
                            cache.StringSet(key, json, timeExpiration);
                        }
                    }
                    else
                    {
                        //save pre-defined string in case if source-value is null.
                        cache.StringSet(key, "REDIS_NULL", timeExpiration);
                    }
                }
                else
                {
                    //retrieve from cache and serialize to required object
                    tResult = JsonConvert.DeserializeObject<T>(result);
                    //trying to modify
                    if (modifyEntityFunc(tResult))
                    {
                        //and save if required
                        var json = JsonConvert.SerializeObject(tResult);
                        cache.StringSet(key, json,  timeExpiration);
                    }
                }

                //refresh exiration in case of sliding expiration flag
                if(isSlidingExpiration)
                    cache.KeyExpire(key, timeExpiration);
            }
            catch (Exception ex)
            {
                logException = ex;
            }
            finally
            {                    
                cache.LockRelease(lockName, lockToken);
            }
            break;
        }

        if (lockCounter >= 20 || logException!=null)
        {
            //log it
        }

        return tResult;
    }

и использование:

public class User
{
    public int ViewCount { get; set; }
}

var cachedAndModifiedItem = GetCachedAndModifyWithLock<User>( "MyAwesomeKey", () =>
        {
            //return from db or kind of that
            return new User() { ViewCount = 0 };
        }, TimeSpan.FromMinutes(10), user=>
        {
            if (user.ViewCount< 3)
            {
                user.ViewCount++;
                return true; //save it to cache
            }
            return false; //do not update it in cache
        }, TimeSpan.FromSeconds(10),true);

Этот код может быть улучшен (например, вы можете добавлять транзакции для меньшего количества вызовов в кэш и т. Д.), Но я рад, что это будет полезно для вас.

Другие вопросы по тегам