Как объединить входные данные "один к одному" и "один ко многим": выходные отношения в канале?
Некоторое время я боролся с этой проблемой, хотя, если честно, я многое узнал о Conduit, поскольку ранее я в основном использовал консервированные примеры с несколькими исключениями.
Основная проблема сформулирована таким образом для трубопроводов A
, B
, а также C
; A .| B
(A
питается в B
) а также A .| C
и, наконец, мне нужно иметь функцию, которая принимает B и C и производит промежуточный канал, вызвать его Merge B C
так что я могу сделать (Merge B C) .| D
, Мой опыт работы с не-Haskell языками с FRP/ потоковыми библиотеками позволяет предположить, что существует несколько различных способов выполнить "слияние" (например, семейство операций zip "образец включен" - производить только новые элементы для D
когда один или несколько выбранных входных каналов имеют новое значение и т. д.). Я думаю, что моя проблема в том, чтобы понять, как это сделать в Conduit, если это поддерживается.
Даже более конкретно для моей конкретной проблемы сегодня, B
имеет отношения 1:1 с A
, в то время как C
имеет много: 1 отношения с A
и в конечном итоге в D
Я хочу повторяющиеся элементы B
в сочетании с соответствующими элементами C
: если a~b
а также a~c
за a
в A
, b
в B
, а также c
в C
, затем (b,c)
питаются в D
, Так что я смог использовать ZipSink
и тот факт, что это действительно разумное место для снижения (производительность в стороне, на которую я не смотрел). Конечно, как и ожидалось, getZipSink
ничего не знает об отношениях один-ко-многим и о том, как их решать; он имеет широко определенное поведение zip - просто циклически проходить через входные потоки, пока все входные потоки не будут циклически пройдены один раз.
Я полагаю, что один из способов сделать это может состоять в том, чтобы каким-то образом изменить мой поток "один ко многим" в поток "один к одному", выполнив свертывание в нечто вроде списка. Но тогда я должен был бы распаковать это позже вне контекста канала. На данный момент, я просто хочу спросить, что является рекомендуемым способом (ами).
Мой фактический код выглядит как (A
является sourceDirectoryDeep
, B
является processFileName
, C
является processCSV
, а также D
есть (вроде бы, наверное) getZipSink
):
retrieveSmaXtec :: Path Abs Dir -> IO (Vector SxRecord)
retrieveSmaXtec sxDir = do
rows <- sourceDirectoryDeep False (fromAbsDir sxDir)
.| getZipSink (combine <$> ZipSink processFileName <*> ZipSink processCSV )
& runConduitRes
print rows
rows & fmap fromRow & catMaybes & return
where
combine :: (Vector (MapRow Text)) -> (Vector (MapRow Text)) -> (Vector (MapRow Text))
combine v1 v2 = (uncurry DM.union) <$> (zip v1 v2)
processCSV :: (MonadResource m, MonadThrow m, PrimMonad m)=>
ConduitT FilePath Void m (Vector ((MapRow Text)))
processCSV = mapMC (liftIO . DTIO.readFile)
.| intoCSV defCSVSettings
.| sinkVector
processFileName :: (MonadResource m, MonadThrow m, PrimMonad m) =>
ConduitT FilePath Void m (Vector ((MapRow Text)))
processFileName = mapC go
.| sinkVector
where
go :: FilePath -> MapRow Text
go fp = takeFileName fp
& takeWhile (/= '.')
& splitOn "_"
& fmap Txt.pack
& zip colNames
& DM.fromList
colNames = [markKey, idKey]
Импорт (некоторые из которых могут быть посторонними):
import Conduit
import qualified Data.Conduit.Combinators as DCC
import Data.CSV.Conduit
import Data.Function ((&))
import Data.List.Split (splitOn)
import Data.Map as DM
import Data.Text (Text)
import qualified Data.Text as Txt
import qualified Data.Text.IO as DTIO
import Data.Vector (Vector)
import Path
import System.FilePath.Posix