Как первый введенный поток может сигнализировать другим параллельным потокам о завершении того же метода?

Как первый введенный поток может сигнализировать другим параллельным потокам о завершении того же метода?

У меня есть метод с именем скажем PollDPRAM(). Он должен совершить поездку по сети на некоторое медленное оборудование и обновить личные данные объекта. Если один и тот же метод вызывается одновременно другими потоками, они не должны выполнять отключение, а должны дождаться, пока первый пришедший поток завершит задание, и просто выйдут, потому что данные свежие (скажем, 10-30 мс назад не имеет значения), В методе легко обнаружить, что второй, третий и т. Д. Потоки не вводятся первыми. Я использую блокированный счетчик для обнаружения параллелизма.

Проблема: я сделал плохой выбор, чтобы определить выход первого потока, наблюдая за счетчиком (Interlocked.Read), который следит за уменьшением значения счетчика до значения, меньшего, чем оно было обнаружено на входе потока n>1. Выбор плох, потому что первый поток может снова войти в метод почти сразу после его выхода. Таким образом, потоки с n>1 никогда не обнаружат падение счетчика.

Итак, вопрос: как правильно определить, что первый введенный поток вышел из метода, даже если этот первый поток может сразу же ввести его снова?

Спасибо

PS кусок кода

        private void pollMotorsData()
    {
        // Execute single poll with "foreground" handshaking 
        DateTime start = DateTime.Now;
        byte retryCount = 0;
        // Pick old data atomically to detect change
        uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
        bool changeDetected = false;
        // The design goal of DPRAM is to ease the bottleneck
        // Here is a sensor if bottleneck is actually that tight
        long parallelThreads = Interlocked.Increment(ref this.motorsPollThreadCount);
        try
        {
            // For first thread entering the counter will be 1
            if (parallelThreads <= 1)
            {
                do
                {
                    // Handshake signal to DPRAM write process on controller side that host PC is reading
                    this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
                    try
                    {
                        bool canReadMotors = false;
                        byte[] canReadFrozenDataFlag = new byte[2];
                        do
                        {
                            this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
                            canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
                            if (canReadMotors) break;
                            retryCount++;
                            Thread.Sleep(1);
                        } while (retryCount < 10);
                        if (!canReadMotors)
                        {
                            throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
                        }
                        // The lock is meaningless in contructor as it is certainly single threaded
                        // but for practice sake the access to data should always be serialized
                        lock (motorsDataLock)
                        {
                            // Obtain fresh content of DPRAM
                            this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
                            this.motorsDataBorn = DateTime.Now;
                        }
                    }
                    finally
                    {
                        // Handshake signal to DPRAM write process on controller side that host PC has finished reading
                        this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
                    }
                    // Check live change in a separate atom
                    changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
                } while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));
                // Assert that result is live
                if (!changeDetected)
                {
                    throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
                }
            }
            else
            {
                // OK. Bottleneck ! The concurrent polls have collided 
                // Give the controller a breathe by waiting for other thread do the job
                // Avoid aggressive polling of stale data, which is not able to be written, locked by reader
                // Just wait for other thread do whole polling job and return with no action
                // because the data is milliseconds fresh
                do
                {
                    // Amount of parallel threads must eventually decrease
                    // But no thread will leave and decrease the counter until job is done
                    if (Interlocked.Read(ref this.motorsPollThreadCount) < parallelThreads)
                    {
                        // Return is possible because decreased value of concurrentThreads means that
                        // this very time other thread has finished the poll 1 millisecond ago at most
                        return;
                    }
                    Thread.Sleep(1);
                    retryCount++;
                } while ((DateTime.Now - start).TotalMilliseconds < 255);
                throw new DeltaTauControllerException(this.controller, "Timeout 255ms waiting on concurrent thread to complete DPRAM polling");
            }
        }
        finally
        {
            // Signal to other threads that work is done
            Interlocked.Decrement(ref this.motorsPollThreadCount);
            // Trace the timing and bottleneck situations
            TimeSpan duration = DateTime.Now - start;
            if (duration.TotalMilliseconds > 50 || parallelThreads > 1 || retryCount > 0)
            {
                Trace.WriteLine(string.Format("Controller {0}, DPRAM poll {1:0} ms, threads {2}, retries {3}",
                    this.controller.number,
                    duration.TotalMilliseconds,
                    parallelThreads,
                    retryCount));
            }
        }
    }

3 ответа

Решение

Есть много разных способов сделать это. Вы можете использовать критическую секцию, как кто-то уже упоминал, но это не даст вам "просто выйти", если другой поток блокирует. Для этого вам нужен какой-то флаг. Вы можете пойти с изменчивым логическим элементом и заблокировать доступ к этому логическому элементу или использовать семафор с одним счетчиком. Наконец, вы можете использовать мьютекс. Преимущество использования объектов синхронизации заключается в том, что вы можете сделать WaitForSingleObject и установить время ожидания равным 0. Затем вы можете проверить, было ли ожидание успешным (если это было тогда, когда первый поток завершился) или нет (в этом случае первый поток является Все еще работает).

Вы можете использовать классы монитора C#, которые поддерживаются ключевым словом "lock".

По сути, ваш метод может быть заключен в lock(lockobj) { CallMethod() }

Это обеспечит вам защиту, предполагая, что все потоки находятся в одном процессе.

Вам нужно будет использовать Mutex, если вам нужно заблокировать все процессы.

Что касается вашей программы, я бы посмотрел на размещение статической метки времени и кэшированного значения в вашем методе. Поэтому, когда метод входит, если отметка времени находится в допустимом диапазоне, возвращает кэшированное значение, в противном случае просто выполните выборку. В сочетании с механизмом блокировки это должно делать то, что вам нужно.

Конечно, это предполагает, что время на блокировку монитора C# не повлияет на производительность вашего приложения.

ОБНОВЛЕНИЕ: я обновил ваш код, чтобы показать вам, что я имел в виду, используя кеш и метку времени. Я предположил, что ваша переменная "motorsData" - это то, что возвращается из опроса двигателя, и поэтому у меня нет переменной для нее. Однако, если я неправильно понял, просто добавьте переменную, в которой хранятся данные после того, как они возвращаются из кода. Примечание. Я не проверял ошибки для вас, поэтому вам нужно разобраться с вашими исключениями.

    static DateTime lastMotorPoll;
    const TimeSpan CACHE_PERIOD = new TimeSpan(0, 0, 0, 0, 250);
    private object cachedCheckMotorsDataLock = new object();

    private void CachedCheckMotorsData()
    {
        lock (cachedCheckMotorsDataLock)  //Could refactor this to perform a try enter which returns quickly if required
        {
            //If the last time the data was polled is older than the cache period, poll
            if (lastMotorPoll.Add(CACHE_PERIOD) < DateTime.Now)
            {
                pollMotorsData();
                lastMotorPoll = DateTime.Now;
            }
            else //Data is fresh so don't poll
            {
                return;
            }
        }       
    }

    private void pollMotorsData()
    {
        // Execute single poll with "foreground" handshaking 
        DateTime start = DateTime.Now;
        byte retryCount = 0;
        // Pick old data atomically to detect change
        uint motorsDataTimeStampPrev = this.MotorsDataTimeStamp;
        bool changeDetected = false;
        try
        {
            do
            {
                // Handshake signal to DPRAM write process on controller side that host PC is reading
                this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, true);
                try
                {
                    bool canReadMotors = false;
                    byte[] canReadFrozenDataFlag = new byte[2];
                    do
                    {
                        this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006E_BIT15_FOREGROUND_DONE, canReadFrozenDataFlag);
                        canReadMotors = (canReadFrozenDataFlag[1] & 0x80) == 0x80;
                        if (canReadMotors) break;
                        retryCount++;
                        Thread.Sleep(1);
                    } while (retryCount < 10);
                    if (!canReadMotors)
                    {
                        throw new DeltaTauControllerException(this.controller, "Timeout waiting on DPRAM Foreground Handshaking Bit");
                    }
                    // Obtain fresh content of DPRAM
                    this.controller.deltaTauTcpClient.Pmac_GetMem(OFFSET_0x006A_394BYTES_8MOTORS_DATA, this.motorsData);
                    this.motorsDataBorn = DateTime.Now;
                }
                finally
                {
                    // Handshake signal to DPRAM write process on controller side that host PC has finished reading
                    this.controller.deltaTauTcpClient.Pmac_SetBit(OFFSET_0x006A_BIT15_FOREGROUND_READ, 15, false);
                }
                // Check live change in a separate atom
                changeDetected = this.MotorsDataTimeStamp != motorsDataTimeStampPrev;
            } while ((!changeDetected) && ((DateTime.Now - start).TotalMilliseconds < 255));

            // Assert that result is live
            if (!changeDetected)
            {
                throw new DeltaTauControllerException(this.controller, "DPRAM Background Data timestamp is not updated. DPRAM forground handshaking failed.");
            }
        }
    }

Синхронизируйте метод, и внутри метода проверьте запись времени последнего доступа к сети, чтобы определить, нужно ли это делать снова.

Другие вопросы по тегам