Parallel.ForEach CancellationTokenSource не останавливается
В настоящее время я пишу библиотеку ProxyChecker. Я использую поток, который смешивает цикл Parallel.ForEach для проверки всех прокси. Я использую CancellationTokenSource
(cts) сделать мягкий прерывание (с cts.Cancel()
). Как вы можете видеть в следующем коде, я добавил небольшой "тестовый код", который записывает текущие потоки в консоль.
Вот код, который вам нужен:
private void CheckProxies(string[] proxies, int timeout, int threads, string domainToCheckWith)
{
_cts = new CancellationTokenSource();
int checkedProxyCount = 0, uncheckedProxyCount = proxies.Length, goodProxies = 0, badProxies = 0;
mainThread = new Thread(() =>
{
try
{
Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
{
Interlocked.Increment(ref running);
Console.WriteLine("thread running: {0}", running);
try
{
_cts.Token.ThrowIfCancellationRequested();
if (CheckProxy(prox, domainToCheckWith, timeout))
{
Interlocked.Increment(ref checkedProxyCount);
Interlocked.Increment(ref goodProxies);
Interlocked.Decrement(ref uncheckedProxyCount);
}
else
{
Interlocked.Increment(ref checkedProxyCount);
Interlocked.Decrement(ref uncheckedProxyCount);
Interlocked.Increment(ref badProxies);
}
_cts.Token.ThrowIfCancellationRequested();
OnUpdate(uncheckedProxyCount, checkedProxyCount, goodProxies, badProxies);
}
catch (OperationCanceledException ex) {}
catch (ObjectDisposedException ex) {}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
finally
{
Console.WriteLine("thread running: {0}", running);
Interlocked.Decrement(ref running);
}
});
}
catch (OperationCanceledException ex) {}
catch (ObjectDisposedException ex) {}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
finally
{
isRunning = false;
OnComplete();
}
});
mainThread.Start();
}
Вывод (я вынул несколько строк, так как давать полный код бесполезно)
thread running: 1
thread running: 1
thread running: 2
thread running: 2
//Slowly going up to 50
thread running: 50
thread running: 50
thread running: 50
//Staying at 50 till I press stop
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 50
thread running: 49
thread running: 48
thread running: 47
thread running: 46
//Going down...
thread running: 17
thread running: 16
thread running: 15
thread running: 14
thread running: 13
thread running: 12
thread running: 11
thread running: 10
thread running: 10
thread running: 8
thread running: 7
thread running: 6
thread running: 5
thread running: 4
А затем он останавливается на 4 или 3 или 2 (каждый раз разные). Я подождал несколько минут, но после запуска Parallel.ForEach ничего не вышло, ни код.
Время ожидания запроса - 5000, потоков - 50.
Вот другой код для проверки:
private bool CheckProxy(string proxy, string domainToCheckWith, int timeout)
{
try
{
WebRequest req = WebRequest.Create(domainToCheckWith);
req.Proxy = new WebProxy(proxy);
req.Timeout = timeout;
var response = (HttpWebResponse) req.GetResponse();
string responseString = ReadResponseString(response);
if (responseString.Contains("SOMETHING HERE"))
{
OnGoodProxy(proxy);
return true;
}
if (responseString.Contains("SOMEOTHERTHINGHERE"))
{
OnBadProxy(proxy);
return false;
}
OnBadProxy(proxy);
return false;
}
catch (WebException ex)
{
OnBadProxy(proxy);
return false;
}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
return false;
}
}
Функция остановки:
public void StopChecking()
{
try
{
if (_cts != null && mainThread.IsAlive)
{
if (_cts.IsCancellationRequested)
{
mainThread.Abort();
OnLog("Hard aborting Filter Threads...", Color.DarkGreen);
while (mainThread.IsAlive) ;
OnComplete();
isRunning = false;
}
else
{
_cts.Cancel();
OnLog("Soft aborting Filter Threads...", Color.DarkGreen);
}
}
}
catch (Exception ex)
{
OnLog(ex.Message, Color.Red);
}
}
ВАЖНОЕ РЕДАКТИРОВАНИЕ:
Я добавил это в функцию CeckProxy:
Stopwatch sw = new Stopwatch();
sw.Start();
string responseString = new StreamReader(response.GetResponseStream()).ReadToEnd();
sw.Stop();
Это результат последних нескольких потоков:
thread running: 6
4449
thread running: 5
72534
thread running: 4
180094
thread running: 3
почему это так долго? Я имею в виду 180 секунд?!
2 ответа
Хорошо, я понял это сам.
Теперь я читаю ответ непрерывно и проверяю с помощью секундомера (и request.ReadWriteTimeout), что часть чтения останавливается через определенное время (в моем случае readTimeout
) достигнут. Код
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(domainToCheckWith);
req.Proxy = new WebProxy(proxy);
req.Timeout = timeout;
req.ReadWriteTimeout = readTimeout;
req.Headers.Add(HttpRequestHeader.AcceptEncoding, "deflate,gzip");
req.AutomaticDecompression = DecompressionMethods.Deflate | DecompressionMethods.GZip;
byte[] responseByte = new byte[1024];
string responseString = string.Empty;
sw.Start();
using (WebResponse res = req.GetResponse())
{
using (Stream stream = res.GetResponseStream())
{
while (stream.Read(responseByte, 0, responseByte.Length) > 0)
{
responseString += Encoding.UTF8.GetString(responseByte);
if(sw.ElapsedMilliseconds > (long)timeout)
throw new WebException();
}
}
}
sw.Stop();
Вы можете попробовать заблокировать внутри попробовать заблокировать объект
Object lockObject = new Object();
try
{
Parallel.ForEach(proxies, new ParallelOptions {MaxDegreeOfParallelism = threads, CancellationToken = _cts.Token}, prox =>
{
Interlocked.Increment(ref running);
Console.WriteLine("thread running: {0}", running);
try
{
lock(lockObject)
{
//code.............
}
}
catch
{
}
}
}
catch
{
}