Как правильно реализовать этап конвейера Neuraxle, который фильтрует data_inputs?

Я пытаюсь реализовать BaseStep в neuraxle (0.5.2), который фильтрует data_input (а также expected_output соответственно).

class DataFrameQuery(NonFittableMixin, InputAndOutputTransformerMixin, BaseStep):
    def __init__(self, query):
        super().__init__()
        self.query = query
    
    def transform(self, data_input):
        data_input, expected_output = data_input
        # verify that input and output are either pd.DataFrame or pd.Series
        # ... [redacted] ...
        new_data_input = data_input.query(self.query)
        if all(output is None for output in expected_output):
            new_expected_output = [None] * len(new_data_input)
        else:
            new_expected_output = expected_output.loc[new_data_input.index]
        return new_data_input, new_expected_output

Это естественно (в большинстве случаев) приведет к изменению len(data_inputs) (а также expected_outputs). В последней версииneuraxle, Я получаю AssertionError:

data_input = pd.DataFrame([{"A": 1, "B": 1}, {"A": 2, "B": 2}], index=[1, 2])
expected_output = pd.Series([1, 2], index=[1, 2])
pipeline = Pipeline([
    DataFrameQuery("A == 1")
])
pipeline.fit_transform(data_input, expected_output)
AssertionError: InputAndOutputTransformerMixin: 
    Caching broken because there is a different len of current ids, and data inputs. 
    Please use InputAndOutputTransformerWrapper if you plan to change the len of the data inputs.

Насколько я понимаю, именно здесь должны вступить в игру методы обработчика Neuraxle. Однако до сих пор я не нашел ни одного, который позволил бы мне обновитьcurrent_ids для входов и выходов после преобразования (я думаю, это должно быть _did_transform, но, кажется, не вызывается).

Обычно:

  • Как правильно обновлять current_ids как для входов, так и для ожидаемых результатов после преобразования (на одном этапе)?
  • На какие аспекты следует обращать внимание при применении побочных эффектов к data_container? Используются ли идентификаторы, например, для разделения данных для SIMD-параллелизма? Ожидается ли, что новые идентификаторы будут последовательностью целых чисел?

Изменить: я также попытался установить savers и используя InputAndOutputTransformerWrapperкак описано здесь. По-прежнему появляется следующая ошибка (возможно, потому, что я не уверен, куда позвонитьhandle_transform):

AssertionError: InputAndOutputTransformerWrapper: 
    Caching broken because there is a different len of current ids, and data inputs.
    Please resample the current ids using handler methods, or create new ones by setting the wrapped step saver to HashlibMd5ValueHasher using the BaseStep.set_savers method.

Изменить: на данный момент я решил проблему следующим образом:


class OutputShapeChangingStep(NonFittableMixin, InputAndOutputTransformerMixin, BaseStep):
    def __init__(self, idx):
        super().__init__()
        self.idx = idx
        
    def _update_data_container_shape(self, data_container):
        assert len(data_container.expected_outputs) == len(data_container.data_inputs)
        data_container.set_current_ids(range(len(data_container.data_inputs)))
        data_container = self.hash_data_container(data_container)
        return data_container
    
    def _set_data_inputs_and_expected_outputs(self, data_container, new_inputs, new_expected_outputs) -> DataContainer:
        data_container.set_data_inputs(new_inputs)
        data_container.set_expected_outputs(new_expected_outputs)
        data_container = self._update_data_container_shape(data_container)
        return data_container
    
    def transform(self, data_inputs):
        data_inputs, expected_outputs = data_inputs
        return data_inputs[self.idx], expected_outputs[self.idx]

Я, вероятно, "ошибочно" отвергаю _set_data_inputs_and_expected_outputs из InputAndOutputTransformerMixin в этом случае (будет _transform_data_container быть лучшим выбором?), но как это обновление current_ids(и перефразирование контейнера) представляется возможным. Однако мне все еще было бы интересно, как сделать это в большей степени в соответствии с ожиданиями API Neuraxle.

1 ответ

Решение

Лично мне больше всего нравится использовать только методы-обработчики. На мой взгляд, он намного чище.

Пример использования с методами-обработчиками:

class WindowTimeSeries(ForceHandleMixin, BaseTransformer):
   def __init__(self):
      BaseTransformer.__init__(self)
      ForceHandleMixin.__init__(self)

   def _transform_data_container(self, data_container: DataContainer, context: ExecutionContext) -> DataContainer:
      di = data_container.data_inputs
      new_di, new_eo = np.array_split(np.array(di), 2)

      return DataContainer(
        summary_id=data_container.summary_id,
        data_inputs=new_di,
        expected_outputs=new_eo
      )

Таким образом, текущие идентификаторы будут воссозданы и хешированы с поведением по умолчанию. Примечание: сводный идентификатор - это самая важная вещь. Он создается в начале и перехэшируется с помощью гиперпараметров... При необходимости вы также можете сгенерировать новые текущие идентификаторы с помощью настраиваемой заставки, такой как HashlibMd5ValueHasher.

Изменить, действительно была ошибка. Это исправлено здесь: https://github.com/Neuraxio/Neuraxle/pull/379

Пример использования:

step = InputAndOutputTransformerWrapper(WindowTimeSeriesForOutputTransformerWrapper()) \
    .set_hashers([HashlibMd5ValueHasher()])
step = StepThatInheritsFromInputAndOutputTransformerMixin() \
     .set_hashers([HashlibMd5ValueHasher()])
Другие вопросы по тегам