Твиттер поток API с агентами в F#
Из блога Дона Сайма ( http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus-twitter-sample.aspx) Я попытался реализовать слушатель Twitter-потока. Моя цель - следовать указаниям документации твиттер-API, в которой говорится, что "твиты часто следует сохранять или ставить в очередь перед обработкой при создании высоконадежной системы".
Поэтому мой код должен иметь два компонента:
- Очередь, которая накапливается и обрабатывает каждый статус / твит json
- Что-то для чтения твиттер-потока, который сбрасывает в очередь твит в строках json
Я выбираю следующее:
- Агент, которому я публикую каждый твит, который декодирует JSON и выдает его в базу данных.
- Простой http веб-запрос
Я также хотел бы сбросить в текстовый файл любую ошибку от вставки в базу данных. (Я, вероятно, переключусь на агента супервизора для всех ошибок).
Две проблемы:
- моя стратегия здесь хороша? Если я правильно понимаю, агент ведет себя как интеллектуальная очередь и обрабатывает свои сообщения асинхронно (если у него в очереди 10 парней, он будет обрабатывать их сразу, вместо того, чтобы ждать, пока 1-й завершит, затем 2-й и т. Д.)...), правильный?
- Согласно сообщению Дона Сайма, все до того времени было Изолировано, поэтому StreamWriter и дамп базы данных Изолированы. Но так как мне это нужно, я никогда не закрываю соединение с базой данных...?
Код выглядит примерно так:
let dumpToDatabase databaseName =
//opens databse connection
fun tweet -> inserts tweet in database
type Agent<'T> = MailboxProcessor<'T>
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
async{
use w2 = new StreamWriter(@"\Errors.txt")
let dumpError =fun (error:string) -> w2.WriteLine( error )
let dumpTweet = dumpToDatabase "stream"
while true do
let! msg = inbox.Receive()
try
let tw = decode msg
dumpTweet tw
with
| :? MySql.Data.MySqlClient.MySqlException as ex ->
dumpError (msg+ex.ToString() )
| _ as ex -> ()
}
)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
Большое спасибо!
Редактирование кода с агентом процессора:
let dumpToDatabase (tweets:tweet list)=
bulk insert of tweets in database
let agentProcessor =
Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
async{
while true do
let! msg = inbox.Receive()
try
msg
|> List.map(decode)
|> dumpToDatabase
with
| _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
}
)
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
let rec loop messageList count = async{
try
let! newMsg = inbox.Receive()
let newMsgList = newMsg::messageList
if count = 10 then
agentProcessor.Post( newMsgList )
return! loop [] 0
else
return! loop newMsgList (count+1)
with
| _ as ex -> Console.WriteLine("Dump "+ex.ToString())
}
loop [] 0)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
1 ответ
Я думаю, что лучший способ описать агента - это запущенный процесс, который сохраняет некоторое состояние и может взаимодействовать с другими агентами (или веб-страницами или базой данных). При написании приложений на основе агентов вы часто можете использовать несколько агентов, которые отправляют сообщения друг другу.
Я думаю, что идея создать агента, который читает твиты из Интернета и сохраняет их в базе данных, является хорошим выбором (хотя вы также можете хранить твиты в памяти как состояние агента).
Я не буду постоянно поддерживать соединение с базой данных - MSSQL (и, вероятно, MySQL тоже) реализует пул соединений, поэтому он не будет закрывать соединение автоматически, когда вы его отпустите. Это означает, что более безопасно и аналогично эффективно повторно открывать соединение каждый раз, когда вам нужно записать данные в базу данных.
Если вы не ожидаете получения большого количества сообщений об ошибках, я бы, вероятно, сделал бы то же самое для файлового потока (при записи вы можете открыть его, чтобы новый контент был добавлен в конец).
Очередь работы агентов F# заключается в том, что они обрабатывают сообщения одно за другим (в вашем примере вы ожидаете сообщения, используя inbox.Receive()
, Когда очередь содержит несколько сообщений, вы будете получать их одно за другим (в цикле).
Если вы хотите обрабатывать несколько сообщений одновременно, вы можете написать агент, который ожидает, скажем, 10 сообщений, а затем отправляет их в виде списка другому агенту (который затем будет выполнять массовую обработку).
Вы также можете указать
timeout
параметр кReceive
метод, так что вы можете подождать не более 10 сообщений, пока все они приходят в течение одной секунды - таким образом, вы можете довольно элегантно реализовать массовую обработку, которая не удерживает сообщения в течение длительного времени.