Количество задач процессора NiFi очень велико, в чем причина?

Я написал базовый пользовательский процессор, который отправляет поток в отношение "Retry", а также вызывает штраф.

package nlsn.processors.core.main;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;

@Tags({ "wait", "wait on time"})
@CapabilityDescription("Wait on time")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class CustomWait extends AbstractProcessor {

    public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
            .name("SUCCESS").description("well done, carry on").build();

    public static final Relationship FAILURE_RELATIONSHIP = new Relationship.Builder()
            .name("FAILURE.").description("fail").build();

    public static final Relationship POINT_TO_SELF_RELATIONSHIP = new Relationship.Builder()
            .name("RETRY").description("point it back to processor").build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;


    @Override
    protected void init(final ProcessorInitializationContext context) {

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(SUCCESS_RELATIONSHIP);
        relationships.add(FAILURE_RELATIONSHIP);
        relationships.add(POINT_TO_SELF_RELATIONSHIP);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
        final ComponentLog logger = getLogger();
        FlowFile flowFile = session.get();
        if (flowFile != null) {
            logger.info("flow file is not null.");
            String state = flowFile.getAttribute("_wait_state");
            if (state == null || state.isEmpty()) {
                logger.info("\"_wait_state\" attribute is missing, going into WAIT.");
                flowFile = session.putAttribute( flowFile, "_wait_state", "1");
                flowFile = session.penalize(flowFile);
                session.transfer( flowFile, POINT_TO_SELF_RELATIONSHIP );
            } else {
                logger.info("\"_wait_state\" attribute is available, breaking WAIT.");
                flowFile = session.removeAttribute( flowFile, "_wait_state" );
                session.transfer( flowFile, SUCCESS_RELATIONSHIP); 
            }
        } else {
            //logger.info("flow file is null (bad)!!!.");
        }
    }
}

код работает как положено. Но мне интересно, почему количество задач (192 569) так велико. Как и ожидалось, процесс завершился за 30 секунд?

(см. количество задач процессора CustomWait)

  1. что работает нифи в фоновом режиме?
  2. это большое количество фактически загружает процессор?
  3. если это плохо, как это исправить?

Спасибо

1 ответ

Решение
  1. Процессор планируется запустить контроллером NiFi, когда в очереди есть FlowFile (FF), подпитывающий процесс без проверки оштрафованного состояния FF. В onTrigger процессора он попытается получить FF из входных очередей (session.get()). это session.get() не получит штрафных FF, так что в итоге он вернется к нулю. Вот почему проверка на нулевой FF нужна, а не плохо. Я предполагаю, что вы не изменили расписание запуска, а это означает, что контроллер попытается запустить этот процессор как можно быстрее. Это приводит к завышенному количеству задач.
  2. Он пытается проверить ввод для обработки, поэтому он использует процессор. Зависит ли эта нагрузка от количества доступных задач и процессоров, работающих в системе.
  3. Не плохо по своей сути, но его можно сократить, установив расписание запуска!= 0.
Другие вопросы по тегам