C# Async WebRequests: выполнить действие, когда все запросы выполнены
У меня есть это базовое консольное приложение на C#, которое асинхронно использует WebRequest для получения html из списка сайтов. Он работает нормально, но как мне установить триггер, который срабатывает, когда каждый сайт в списке был обработан?
Я потратил пару часов, исследуя различные решения онлайн, в том числе документы MS, но ни одно из них не дает прямого ответа с помощью кода. Я читал о IAsyncResult.AsyncWaitHandle, но понятия не имею, как интегрировать его в мой код. Я просто хотел бы вызвать пользовательскую функцию, когда все потоки завершают обработку или время ожидания.
Одна хитрость заключается в том, что я никогда не знаю заранее, сколько сайтов в моем списке (это определяется пользователем), поэтому мне нужно достаточно надежное решение, чтобы ждать 5 событий для завершения 100 000 событий.
Благодарю. Рабочий код ниже:
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.IO;
using System.Net;
using System.Threading;
namespace AsyncApp_01
{
class Program
{
static void Main(string[] args)
{
ArrayList alSites = new ArrayList();
alSites.Add("http://www.google.com");
alSites.Add("http://www.lostspires.com");
ScanSites(alSites);
Console.Read();
}
private static void ScanSites(ArrayList sites)
{
foreach (string uriString in sites)
{
WebRequest request = HttpWebRequest.Create(uriString);
request.Method = "GET";
object data = new object(); //container for our "Stuff"
// RequestState is a custom class to pass info to the callback
RequestState state = new RequestState(request, data, uriString);
IAsyncResult result = request.BeginGetResponse(new AsyncCallback(UpdateItem), state);
//Register the timeout callback
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, new WaitOrTimerCallback(ScanTimeoutCallback), state, (30 * 1000), true);
}
}
private static void UpdateItem(IAsyncResult result)
{
// grab the custom state object
RequestState state = (RequestState)result.AsyncState;
WebRequest request = (WebRequest)state.Request;
// get the Response
HttpWebResponse response = (HttpWebResponse)request.EndGetResponse(result);
Stream s = (Stream)response.GetResponseStream();
StreamReader readStream = new StreamReader(s);
// dataString will hold the entire contents of the requested page if we need it.
string dataString = readStream.ReadToEnd();
response.Close();
s.Close();
readStream.Close();
Console.WriteLine(dataString);
}
private static void ScanTimeoutCallback(object state, bool timedOut)
{
if (timedOut)
{
RequestState reqState = (RequestState)state;
if (reqState != null)
{
reqState.Request.Abort();
}
Console.WriteLine("aborted- timeout");
}
}
class RequestState
{
public WebRequest Request; // holds the request
public object Data; // store any data in this
public string SiteUrl; // holds the UrlString to match up results (Database lookup, etc).
public RequestState(WebRequest request, object data, string siteUrl)
{
this.Request = request;
this.Data = data;
this.SiteUrl = siteUrl;
}
}
}
}
Бонусные баллы для тех, кто также может сказать мне, как ограничить количество одновременных потоков. Например, если у меня есть 100 сайтов для обработки, как мне настроить так, чтобы 10 сайтов обрабатывались одновременно, но не больше. Я не хочу открывать 100 тем.
1 ответ
Вот быстрый пример, который я бросил вместе. Я удалил реализацию WebClient, так как кажется, что вы используете WebRequest. Я также использую ConcurrentBag.Net 4:
public class Scraper
{
private readonly IEnumerable<string> _sites;
private readonly ConcurrentBag<string> _data;
private volatile int _count;
private readonly int _total;
public Scraper(IEnumerable<string> sites)
{
_sites = sites;
_data = new ConcurrentBag<string>();
_total = sites.Count();
}
public void Start()
{
foreach (var site in _sites)
{
ScrapeSite(site);
}
}
private void ScrapeSite(string site)
{
var req = WebRequest.Create(site);
req.BeginGetResponse(AsyncCallback, req);
}
private void AsyncCallback(IAsyncResult ar)
{
Interlocked.Increment(ref _count);
var req = ar.AsyncState as WebRequest;
var result = req.EndGetResponse(ar);
var reader = new StreamReader(result.GetResponseStream());
var data = reader.ReadToEnd();
this.OnSiteScraped(req.RequestUri.AbsoluteUri, data);
_data.Add(data);
if (_count == _total)
{
OnScrapingComplete();
}
}
private void OnSiteScraped(string site, string data)
{
var handler = this.SiteScraped;
if (handler != null)
{
handler(this, new SiteScrapedEventArgs(site, data));
}
}
private void OnScrapingComplete()
{
var handler = this.ScrapingComplete;
if (handler != null)
{
handler(this, new ScrapingCompletedEventArgs(_data));
}
}
public event EventHandler<SiteScrapedEventArgs> SiteScraped;
public event EventHandler<ScrapingCompletedEventArgs> ScrapingComplete;
}
public class SiteScrapedEventArgs : EventArgs
{
public string Site { get; private set; }
public string Data { get; private set; }
public SiteScrapedEventArgs(string site, string data)
{
this.Site = site;
this.Data = data;
}
}
Хорошо, я создал несколько базовых классов, и это должно сработать. Если этого недостаточно, извините, я просто не могу вам помочь:
public class RankedPage
{
public int Rank { get; set; }
public string Site { get; set; }
}
public class WebRequestData
{
public WebRequest WebRequest { get; set; }
public RankedPage Page { get; set; }
}
public class Scraper
{
private readonly IEnumerable<RankedPage> _sites;
private readonly ConcurrentBag<KeyValuePair<RankedPage,string>> _data;
private volatile int _count;
private readonly int _total;
public Scraper(IEnumerable<RankedPage> sites)
{
_sites = sites;
_data = new ConcurrentBag<KeyValuePair<RankedPage, string>>();
_total = sites.Count();
}
public void Start()
{
foreach (var site in _sites)
{
ScrapeSite(site);
}
}
private void ScrapeSite(RankedPage site)
{
var req = WebRequest.Create(site.Site);
req.BeginGetResponse(AsyncCallback, new WebRequestData{ Page = site, WebRequest = req});
}
private void AsyncCallback(IAsyncResult ar)
{
Interlocked.Increment(ref _count);
var webRequestData = ar.AsyncState as WebRequestData;
var req = webRequestData.WebRequest;
var result = req.EndGetResponse(ar);
var reader = new StreamReader(result.GetResponseStream());
var data = reader.ReadToEnd();
this.OnSiteScraped(webRequestData.Page, data);
_data.Add(new KeyValuePair<RankedPage, string>(webRequestData.Page,data));
if (_count == _total)
{
OnScrapingComplete();
}
}
private void OnSiteScraped(RankedPage page, string data)
{
var handler = this.SiteScraped;
if (handler != null)
{
handler(this, new SiteScrapedEventArgs(page, data));
}
}
private void OnScrapingComplete()
{
var handler = this.ScrapingComplete;
if (handler != null)
{
handler(this, new ScrapingCompletedEventArgs(_data));
}
}
public event EventHandler<SiteScrapedEventArgs> SiteScraped;
public event EventHandler<ScrapingCompletedEventArgs> ScrapingComplete;
}
public class SiteScrapedEventArgs : EventArgs
{
public RankedPage Site { get; private set; }
public string Data { get; private set; }
public SiteScrapedEventArgs(RankedPage site, string data)
{
this.Site = site;
this.Data = data;
}
}
public class ScrapingCompletedEventArgs : EventArgs
{
public IEnumerable<KeyValuePair<RankedPage,string >> SiteData { get; private set; }
public ScrapingCompletedEventArgs(IEnumerable<KeyValuePair<RankedPage, string>> siteData)
{
this.SiteData = siteData;
}
}