Как реализовать репозиторий для ленивой загрузки данных с помощью neuraxle?

В документации neuraxle показан пример использования репозитория для отложенной загрузки данных в конвейер, см. Следующий код:

from neuraxle.pipeline import Pipeline, MiniBatchSequentialPipeline
from neuraxle.base import ExecutionContext
from neuraxle.steps.column_transformer import ColumnTransformer
from neuraxle.steps.flow import TrainOnlyWrapper

training_data_ids = training_data_repository.get_all_ids()
context = ExecutionContext('caching_folder').set_service_locator({
    BaseRepository: training_data_repository
})

pipeline = Pipeline([
    ConvertIDsToLoadedData().assert_has_services(BaseRepository),
    ColumnTransformer([
        (range(0, 2), DateToCosineEncoder()),
        (3, CategoricalEnum(categeories_count=5, starts_at_zero=True)),
    ]),
    Normalizer(),
    TrainOnlyWrapper(DataShuffler()),
    MiniBatchSequentialPipeline([
        Model()
    ], batch_size=128)
]).with_context(context)

Однако не показано, как реализовать BaseRepository и ConvertIDsToLoadedDataклассы. Как лучше всего реализовать эти классы? Можно ли привести пример?

1 ответ

Я не проверял, были ли следующие компиляции или нет, но все должно выглядеть так, как показано ниже. Пожалуйста, отредактируйте этот ответ, если вы найдете что-то изменить и попытались его скомпилировать:

class BaseDataRepository(ABC): 

    @abstractmethod
    def get_all_ids(self) -> List[int]: 
        pass

    @abstractmethod
    def get_data_from_id(self, _id: int) -> object: 
        pass

class InMemoryDataRepository(BaseDataRepository): 
    def __init__(self, ids, data): 
        self.ids: List[int] = ids
        self.data: Dict[int, object] = data

    def get_all_ids(self) -> List[int]: 
        return list(self.ids)

    def get_data_from_id(self, _id: int) -> object: 
        return self.data[_id]

class ConvertIDsToLoadedData(BaseStep): 
    def _handle_transform(self, data_container: DataContainer, context: ExecutionContext): 
        repo: BaseDataRepository = context.get_service(BaseDataRepository)
        ids = data_container.data_inputs

        # Replace data ids by their loaded object counterpart: 
        data_container.data_inputs = [repo.get_data_from_id(_id) for _id in ids]

        return data_container, context

context = ExecutionContext('caching_folder').set_service_locator({
    BaseDataRepository: InMemoryDataRepository(ids, data)  # or insert here any other replacement class that inherits from `BaseDataRepository` when you'll change the database to a real one (e.g.: SQL) rather than a cheap "InMemory" stub. 
})

Для получения обновлений см. Проблему, которую я открыл здесь для этого вопроса: https://github.com/Neuraxio/Neuraxle/issues/421

Другие вопросы по тегам