Ленивый.. но нетерпеливый загрузчик данных в F#

Кто-нибудь знает о "уровне техники" в отношении следующего предмета:

  • У меня есть данные, которые требуют приличного времени для загрузки. они являются историческим уровнем для различных акций.
  • Я хотел бы предварительно загрузить их, чтобы избежать задержки при использовании моего приложения
  • Тем не менее, предварительная загрузка их в один блок вначале приводит к тому, что приложение перестает отвечать на запросы, что неудобно для пользователя

Поэтому я не хотел бы загружать свои данные.... если пользователь не запрашивает их и не играет с тем, что у него уже есть, и в этом случае я хотел бы получить понемногу. Так что это ни "ленивый", ни "нетерпеливый", более "ленивый, когда вам нужно" и "нетерпеливый, когда можете", отсюда и сокращение LWYNEWYC.

Я сделал следующее, что, кажется, работает, но мне просто интересно, есть ли признанный и благословенный подход для такой вещи?

let r = LoggingFakeRepo () :> IQuoteRepository
r.getHisto "1" |> ignore  //prints Getting histo for 1 when called

let rc =  RepoCached (r) :> IQuoteRepository
rc.getHisto "1" |> ignore //prints Getting histo for 1 the first time only

let rcc =  RepoCachedEager (r) :> IQuoteRepository
rcc.getHisto "100" |> ignore  //prints Getting histo 1..100 by itself BUT
                              //prints Getting histo 100 immediately when called

И классы

type IQuoteRepository = 
   abstract getUnderlyings : string seq
   abstract getHisto :  string -> string

type LoggingFakeRepo () =
   interface IQuoteRepository with 
      member x.getUnderlyings = printfn "getting underlyings"
                                [1 .. 100] |> List.map string :> _

      member x.getHisto udl = printfn "getting histo for %A" udl
                              "I am a historical dataset in a disguised party"

type RepoCached (rep : IQuoteRepository) =
   let memoize f =
     let cache = new System.Collections.Generic.Dictionary<_, _>()
     fun x ->
        if cache.ContainsKey(x) then cache.[x]
        else let res = f x
             cache.[x] <- res
             res
   let udls = lazy (rep.getUnderlyings )
   let gethistom = memoize rep.getHisto

   interface IQuoteRepository with 
      member x.getUnderlyings = udls.Force()
      member x.getHisto udl = gethistom udl

type Message = string * AsyncReplyChannel<UnderlyingWrap>
type RepoCachedEager (rep : IQuoteRepository) =
   let udls = rep.getUnderlyings

   let agent = MailboxProcessor<Message>.Start(fun inbox ->
      let repocached = RepoCached (rep) :> IQuoteRepository
      let rec loop l =
         async {  try
                     let timeout = if l|> List.isEmpty  then -1 else 50
                     let! (udl, replyChannel) = inbox.Receive(timeout)
                     replyChannel.Reply(repocached.getHisto udl)
                     do! loop l
                  with 
                  | :? System.TimeoutException -> 
                     let udl::xs = l
                     repocached.getHisto udl |> ignore
                     do! loop xs
          }
      loop (udls |> Seq.toList))

   interface IQuoteRepository with 
      member x.getUnderlyings = udls
      member x.getHisto udl = agent.PostAndReply(fun reply -> udl, reply)

1 ответ

Решение

Мне нравится ваше решение. Я думаю, что использование агента для реализации некоторой фоновой загрузки с тайм-аутом - отличный способ пойти - агенты могут красиво инкапсулировать изменяемое состояние, так что это совершенно безопасно, и вы можете легко кодировать желаемое поведение.

Я думаю, что асинхронные последовательности могут быть еще одной полезной абстракцией (если я прав, они доступны в FSharpX в наши дни). Асинхронная последовательность представляет собой вычисление, которое асинхронно генерирует больше значений, поэтому они могут быть хорошим способом отделить загрузчик данных от остальной части кода.

Я думаю, что вам все равно понадобится агент для синхронизации в какой-то момент, но вы можете красиво разделить различные задачи, используя асинхронные последовательности.

Код для загрузки данных может выглядеть примерно так:

let loadStockPrices repo = asyncSeq {
  // TODO: Not sure how you detect that the repository has no more data...
  while true do
    // Get next item from the repository, preferably asynchronously!
    let! data = repo.AsyncGetNextHistoricalValue()
    // Return the value to the caller...
    yield data }

Этот код представляет загрузчик данных и отделяет его от кода, который его использует. Из агента, который использует источник данных, вы можете использовать AsyncSeq.iterAsync потреблять ценности и что-то с ними делать.

С iterAsyncфункция, которую вы указываете как потребитель, является асинхронной. Это может заблокировать Sleep) и когда он блокируется, источник - то есть ваш загрузчик - также блокируется. Это довольно хороший неявный способ управления загрузчиком из кода, который потребляет данные.

Функция, которая еще не находится в библиотеке (но была бы полезна), является частично нетерпеливым оценщиком, который принимает AsyncSeq<'T> и возвращает новый AsyncSeq<'T> но получает определенное количество элементов из источника как можно скорее и кэширует их (чтобы потребителю не приходилось ждать, когда он запрашивает значение, если источник может производить значения достаточно быстро).

Другие вопросы по тегам