Количество задач процессора NiFi очень велико, в чем причина?
Я написал базовый пользовательский процессор, который отправляет поток в отношение "Retry", а также вызывает штраф.
package nlsn.processors.core.main;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
@Tags({ "wait", "wait on time"})
@CapabilityDescription("Wait on time")
@SeeAlso({})
@ReadsAttributes({ @ReadsAttribute(attribute = "", description = "") })
@WritesAttributes({ @WritesAttribute(attribute = "", description = "") })
public class CustomWait extends AbstractProcessor {
public static final Relationship SUCCESS_RELATIONSHIP = new Relationship.Builder()
.name("SUCCESS").description("well done, carry on").build();
public static final Relationship FAILURE_RELATIONSHIP = new Relationship.Builder()
.name("FAILURE.").description("fail").build();
public static final Relationship POINT_TO_SELF_RELATIONSHIP = new Relationship.Builder()
.name("RETRY").description("point it back to processor").build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
final Set<Relationship> relationships = new HashSet<Relationship>();
relationships.add(SUCCESS_RELATIONSHIP);
relationships.add(FAILURE_RELATIONSHIP);
relationships.add(POINT_TO_SELF_RELATIONSHIP);
this.relationships = Collections.unmodifiableSet(relationships);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final ComponentLog logger = getLogger();
FlowFile flowFile = session.get();
if (flowFile != null) {
logger.info("flow file is not null.");
String state = flowFile.getAttribute("_wait_state");
if (state == null || state.isEmpty()) {
logger.info("\"_wait_state\" attribute is missing, going into WAIT.");
flowFile = session.putAttribute( flowFile, "_wait_state", "1");
flowFile = session.penalize(flowFile);
session.transfer( flowFile, POINT_TO_SELF_RELATIONSHIP );
} else {
logger.info("\"_wait_state\" attribute is available, breaking WAIT.");
flowFile = session.removeAttribute( flowFile, "_wait_state" );
session.transfer( flowFile, SUCCESS_RELATIONSHIP);
}
} else {
//logger.info("flow file is null (bad)!!!.");
}
}
}
код работает как положено. Но мне интересно, почему количество задач (192 569) так велико. Как и ожидалось, процесс завершился за 30 секунд?
(см. количество задач процессора CustomWait)
- что работает нифи в фоновом режиме?
- это большое количество фактически загружает процессор?
- если это плохо, как это исправить?
Спасибо
1 ответ
Решение
- Процессор планируется запустить контроллером NiFi, когда в очереди есть FlowFile (FF), подпитывающий процесс без проверки оштрафованного состояния FF. В onTrigger процессора он попытается получить FF из входных очередей (
session.get()
). этоsession.get()
не получит штрафных FF, так что в итоге он вернется к нулю. Вот почему проверка на нулевой FF нужна, а не плохо. Я предполагаю, что вы не изменили расписание запуска, а это означает, что контроллер попытается запустить этот процессор как можно быстрее. Это приводит к завышенному количеству задач. - Он пытается проверить ввод для обработки, поэтому он использует процессор. Зависит ли эта нагрузка от количества доступных задач и процессоров, работающих в системе.
- Не плохо по своей сути, но его можно сократить, установив расписание запуска!= 0.