Твиттер поток API с агентами в F#

Из блога Дона Сайма ( http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus-twitter-sample.aspx) Я попытался реализовать слушатель Twitter-потока. Моя цель - следовать указаниям документации твиттер-API, в которой говорится, что "твиты часто следует сохранять или ставить в очередь перед обработкой при создании высоконадежной системы".

Поэтому мой код должен иметь два компонента:

  • Очередь, которая накапливается и обрабатывает каждый статус / твит json
  • Что-то для чтения твиттер-потока, который сбрасывает в очередь твит в строках json

Я выбираю следующее:

  • Агент, которому я публикую каждый твит, который декодирует JSON и выдает его в базу данных.
  • Простой http веб-запрос

Я также хотел бы сбросить в текстовый файл любую ошибку от вставки в базу данных. (Я, вероятно, переключусь на агента супервизора для всех ошибок).

Две проблемы:

  • моя стратегия здесь хороша? Если я правильно понимаю, агент ведет себя как интеллектуальная очередь и обрабатывает свои сообщения асинхронно (если у него в очереди 10 парней, он будет обрабатывать их сразу, вместо того, чтобы ждать, пока 1-й завершит, затем 2-й и т. Д.)...), правильный?
  • Согласно сообщению Дона Сайма, все до того времени было Изолировано, поэтому StreamWriter и дамп базы данных Изолированы. Но так как мне это нужно, я никогда не закрываю соединение с базой данных...?

Код выглядит примерно так:

let dumpToDatabase databaseName = 
   //opens databse connection 
   fun tweet -> inserts tweet in database

type Agent<'T> = MailboxProcessor<'T>



 let agentDump =
            Agent.Start(fun (inbox: MailboxProcessor<string>) ->
               async{
                   use w2 = new StreamWriter(@"\Errors.txt")
                   let dumpError  =fun (error:string) -> w2.WriteLine( error )
                   let dumpTweet =  dumpToDatabase "stream"
                   while true do 
                       let! msg = inbox.Receive()
                       try 
                           let tw = decode msg
                           dumpTweet tw
                       with 
                       | :? MySql.Data.MySqlClient.MySqlException as ex -> 
    dumpError (msg+ex.ToString() ) 
                        | _ as ex -> () 



                             }
                             )

    let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
    let parameters = "track=RT&"
    let stream_url = filter_url

    let stream = twitterStream MyCredentials stream_url parameters


    while true do 
        agentDump.Post(stream.ReadLine())

Большое спасибо!

Редактирование кода с агентом процессора:

let dumpToDatabase (tweets:tweet list)= 
    bulk insert of tweets in database    

let agentProcessor = 
        Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
           async{
               while true do 
                       let! msg = inbox.Receive()
                       try
                          msg
                          |> List.map(decode)
                          |> dumpToDatabase 
                        with
                        | _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
                 }
                 )



let agentDump =
        Agent.Start(fun (inbox: MailboxProcessor<string>) ->
                  let rec loop messageList count = async{
                      try
                          let! newMsg = inbox.Receive()
                          let newMsgList = newMsg::messageList
                          if count = 10 then 
                               agentProcessor.Post( newMsgList )
                               return! loop [] 0
                          else                    
                               return! loop newMsgList (count+1)
                      with
                      | _ as ex -> Console.WriteLine("Dump "+ex.ToString())

                  }
                  loop [] 0)

let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url

let stream = twitterStream MyCredentials stream_url parameters


while true do 
    agentDump.Post(stream.ReadLine())

1 ответ

Решение

Я думаю, что лучший способ описать агента - это запущенный процесс, который сохраняет некоторое состояние и может взаимодействовать с другими агентами (или веб-страницами или базой данных). При написании приложений на основе агентов вы часто можете использовать несколько агентов, которые отправляют сообщения друг другу.

Я думаю, что идея создать агента, который читает твиты из Интернета и сохраняет их в базе данных, является хорошим выбором (хотя вы также можете хранить твиты в памяти как состояние агента).

  • Я не буду постоянно поддерживать соединение с базой данных - MSSQL (и, вероятно, MySQL тоже) реализует пул соединений, поэтому он не будет закрывать соединение автоматически, когда вы его отпустите. Это означает, что более безопасно и аналогично эффективно повторно открывать соединение каждый раз, когда вам нужно записать данные в базу данных.

  • Если вы не ожидаете получения большого количества сообщений об ошибках, я бы, вероятно, сделал бы то же самое для файлового потока (при записи вы можете открыть его, чтобы новый контент был добавлен в конец).

Очередь работы агентов F# заключается в том, что они обрабатывают сообщения одно за другим (в вашем примере вы ожидаете сообщения, используя inbox.Receive(), Когда очередь содержит несколько сообщений, вы будете получать их одно за другим (в цикле).

  • Если вы хотите обрабатывать несколько сообщений одновременно, вы можете написать агент, который ожидает, скажем, 10 сообщений, а затем отправляет их в виде списка другому агенту (который затем будет выполнять массовую обработку).

  • Вы также можете указать timeout параметр к Receive метод, так что вы можете подождать не более 10 сообщений, пока все они приходят в течение одной секунды - таким образом, вы можете довольно элегантно реализовать массовую обработку, которая не удерживает сообщения в течение длительного времени.

Другие вопросы по тегам