Parallel.ForEach CancellationTokenSource не останавливается

В настоящее время я пишу библиотеку ProxyChecker. Я использую поток, который смешивает цикл Parallel.ForEach для проверки всех прокси. Я использую CancellationTokenSource (cts) сделать мягкий прерывание (с cts.Cancel()). Как вы можете видеть в следующем коде, я добавил небольшой "тестовый код", который записывает текущие потоки в консоль.

Вот код, который вам нужен:

private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith)
        {
            _cts = new CancellationTokenSource();
            int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0;
            mainThread = new Thread(() =>
            {
                try
                {
                    Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
                    {
                        Interlocked.Increment(ref running);
                        Console.WriteLine("thread running: {0}", running);
                        try
                        {
                            _cts.Token.ThrowIfCancellationRequested();
                            if (CheckProxy(prox, domainToCheckWith, timeout))
                            {
                                Interlocked.Increment(ref checkedProxyCount);
                                Interlocked.Increment(ref goodProxies);
                                Interlocked.Decrement(ref uncheckedProxyCount);
                            }
                            else
                            {
                                Interlocked.Increment(ref checkedProxyCount);
                                Interlocked.Decrement(ref uncheckedProxyCount);
                                Interlocked.Increment(ref badProxies);
                            }
                            _cts.Token.ThrowIfCancellationRequested();
                            OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies);
                        }
                        catch (OperationCanceledException ex) {}
                        catch (ObjectDisposedException ex) {}
                        catch (Exception ex)
                        {
                            OnLog(ex.Message, Color.Red);
                        }
                        finally
                        {
                            Console.WriteLine("thread running: {0}", running);
                            Interlocked.Decrement(ref running);
                        }
                    });
                }
                catch (OperationCanceledException ex) {}
                catch (ObjectDisposedException ex) {}
                catch (Exception ex)
                {
                    OnLog(ex.Message, Color.Red);
                }
                finally
                {
                    isRunning = false;
                    OnComplete();
                }
            });
            mainThread.Start();
        }

Вывод (я вынул несколько строк, так как давать полный код бесполезно)

thread running: 1
thread running: 1
thread running: 2
thread running: 2

//Slowly going up to  50

thread running: 50
thread running: 50
thread running: 50

//Staying at 50 till I press stop

thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 49
thread running: 48
thread running: 47
thread running: 46

//Going down...

thread running: 17
thread running: 16
thread running: 15
thread running: 14
thread running: 13
thread running: 12
thread running: 11
thread running: 10
thread running: 10
thread running: 8
thread running: 7
thread running: 6
thread running: 5
thread running: 4

А затем он останавливается на 4 или 3 или 2 (каждый раз разные). Я подождал несколько минут, но после запуска Parallel.ForEach ничего не вышло, ни код.

Время ожидания запроса - 5000, потоков - 50.

Вот другой код для проверки:

private bool CheckProxy(string proxy, string domainToCheckWith, int timeout)
{
    try
    {
        WebRequest req = WebRequest.Create(domainToCheckWith);
        req.Proxy = new WebProxy(proxy);
        req.Timeout = timeout;
        var response = (HttpWebResponse) req.GetResponse();
        string responseString = ReadResponseString(response);

        if (responseString.Contains("SOMETHING HERE"))
        {
            OnGoodProxy(proxy);
            return true;
        }
        if (responseString.Contains("SOMEOTHERTHINGHERE"))
        {
            OnBadProxy(proxy);
            return false;
        }
        OnBadProxy(proxy);
        return false;
    }
    catch (WebException ex)
    {
        OnBadProxy(proxy);
        return false;
    }
    catch (Exception ex)
    {
        OnLog(ex.Message, Color.Red);
        return false;
    }
}

Функция остановки:

public void StopChecking()
{
    try
    {
        if (_cts != null && mainThread.IsAlive)
        {
            if (_cts.IsCancellationRequested)
            {
                mainThread.Abort();
                OnLog("Hard aborting Filter Threads...", Color.DarkGreen);
                while (mainThread.IsAlive) ;
                OnComplete();
                isRunning = false;
            }
            else
            {
                _cts.Cancel();
                OnLog("Soft aborting Filter Threads...", Color.DarkGreen);
            }
        }
    }
    catch (Exception ex)
    {
        OnLog(ex.Message, Color.Red);
    }
}

ВАЖНОЕ РЕДАКТИРОВАНИЕ:

Я добавил это в функцию CeckProxy:

        Stopwatch sw = new Stopwatch();
        sw.Start();
        string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd();
        sw.Stop();

Это результат последних нескольких потоков:

thread running: 6
4449
thread running: 5
72534
thread running: 4
180094
thread running: 3

почему это так долго? Я имею в виду 180 секунд?!

2 ответа

Решение

Хорошо, я понял это сам.

Теперь я читаю ответ непрерывно и проверяю с помощью секундомера (и request.ReadWriteTimeout), что часть чтения останавливается через определенное время (в моем случае readTimeout) достигнут. Код

HttpWebRequest req = (HttpWebRequest)WebRequest.Create(domainToCheckWith);
            req.Proxy = new WebProxy(proxy);
            req.Timeout = timeout;
            req.ReadWriteTimeout = readTimeout;
            req.Headers.Add(HttpRequestHeader.AcceptEncoding, "deflate,gzip");
            req.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;

            byte[] responseByte = new byte[1024];
            string responseString = string.Empty;

            sw.Start();
            using (WebResponse res = req.GetResponse())
            {
                using (Stream stream = res.GetResponseStream())
                {
                    while (stream.Read(responseByte, 0, responseByte.Length) > 0)
                    {
                        responseString += Encoding.UTF8.GetString(responseByte);
                        if(sw.ElapsedMilliseconds > (long)timeout)
                            throw new WebException();
                    }

                }
            }
            sw.Stop();

Вы можете попробовать заблокировать внутри попробовать заблокировать объект

Object lockObject = new Object();
try
{
    Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
    {
        Interlocked.Increment(ref running);
        Console.WriteLine("thread running: {0}", running);
        try
        {
            lock(lockObject)
            {
                //code.............
            }
        }
        catch
        {
        }
    }
}
catch
{
}
Другие вопросы по тегам