Вручную завершить входы для Conduit Attoparsec

Я обрабатываю файл журнала системного журнала, каждую строку как отдельную запись системного журнала, и анализирую эту запись, используя синтаксический анализатор Attoparsec. Так что я использую

fileToBS :: IO Handle -> C.Source (ResourceT IO) BS.ByteString
fileToBS handleMaker = source C.$= bsSplitterConduit
  where source = CB.sourceIOHandle handleMaker
        bsSplitterConduit = CB.lines

генерировать поток записей системного журнала. я использую

parseToLogData:: C.Conduit BS.ByteString (ResourceT IO) (Either CATT.ParseError (CATT.PositionRange, LogData))
parseToLogData = CATT.conduitParserEither syslogParser

преобразовать эти строки байтов в значения системного журнала. Значения системного журнала генерируются из этого синтаксического анализатора (с некоторыми моими собственными синонимами типа):

syslogParser :: Parser (Priority, Maybe UTCTime, IPAddress, BS.ByteString)
syslogParser = do
  pri <- priority <?> "priority parse error"
  mbDate <- date <?> "date parse error"
  space
  srcAddr <- ip
  space
  msg <- ATT.takeByteString
  return LogData{pri = pri, timestamp = mbDate, source = srcAddr, message = "msg"}

priority :: Parser Priority
priority = do
  string "<"
  digitsString <- takeWhile1 digit
  string ">"
  return (RawPriority digitsString)

date :: Parser (Maybe UTCTime)
date = do
  rawDate <- ATT.take 15
  let stringDate = BS.unpack rawDate
  let parsedDate = parseTime defaultTimeLocale syslogDateFormat stringDate
  return parsedDate

ip :: Parser IPAddress
ip = do
  oct0 <- takeWhile1 digit
  period
  oct1 <- takeWhile1 digit
  period
  oct2 <- takeWhile1 digit
  period
  oct3 <- takeWhile1 digit
  return (oct0, oct1, oct2, oct3)
--ip = takeWhile1 (\x -> digit x || x == 46)

space = string " "
colon = string ":"
period = string "."

digit test = (test >= 48 && test <= 57)
octet = digit

Проблема заключается в строке, которая принимает все остальные записи системного журнала (msg <- ATT.takeByteString). Эта функция плохо работает с потоками, потому что ей нужен сигнал завершения при использовании возобновляемого парсера (что и используется библиотекой attoparsec).

Я пытался выдать пустые строки байтов, чтобы исправить это поведение, но оно не работает должным образом (см. Дополнительные материалы на https://hackage.haskell.org/package/attoparsec-0.12.1.2/docs/Data-Attoparsec-ByteString.html). Он использует весь входной файл системного журнала в одно проанализированное значение. Это тестовый файл размером 80 МБ, поэтому после первоначального извлечения поля он помещает все последующие сообщения системного журнала в поле сообщения со значением системного журнала.

Вот мой терминатор, чтобы попытаться сигнализировать о "атомном сообщении". Я не уверен, почему это не работает.

terminator :: C.Conduit BS.ByteString (ResourceT IO) BS.ByteString
terminator = C.awaitForever yieldAndAddTerminator
  where
    yieldAndAddTerminator bs = do
      C.yield bs
      C.yield terminator
    terminator = ""

Как я могу рассматривать сообщения UDP как атомарные фрагменты данных в мире канала?

Копию этой кодовой базы можно найти здесь: https://github.com/tureus/safe-forwarder.

1 ответ

Вы, вероятно, хотите соединить свой parseToLogData с функцией, которая предотвращает использование новой строки (код ASCII 10). Используя терминологию проводников-комбинаторов, что-то вроде:

takeWhileCE (/= 10) =$= parseToLogData
dropWhileCE (/= 10) >> dropCE 1 -- flush the rest of it

Вы также можете посмотреть в line комбинаторная функция.

Другие вопросы по тегам