Отправка внешних событий в рабочий процесс
В нашем рабочем процессе каденции нам часто нужно некоторое время ждать внешних событий, прежде чем продолжить (например, чтение электронной почты, нажатие ссылки и т. Д.).
Мне было интересно, как лучше сообщить нашим рабочим процессам об этих событиях. Являются ли сигналы правильным способом, или мы должны создать действие, которое будет ждать события?
Из того, что я видел, нам нужно создать канал сигнала ch := workflow.GetSignalChannel(ctx, SignalName)
Однако контекст недоступен в действиях.
1 ответ
Сигнализация является рекомендуемым способом отправки событий в рабочие процессы.
Часто используемый шаблон для рабочих процессов Go заключается в использовании селектора для ожидания нескольких каналов сигнала, а также для определения таймера будущего.
Перейти образец:
sig1Ch := workflow.GetSignalChannel(ctx, "signal1")
sig2Ch := workflow.GetSignalChannel(ctx, "signal2")
timeout := workflow.NewTimer(ctx, time.Minute * 30)
s := workflow.NewSelector(ctx)
var signal1 *Signal1Struct
var signal2 *Signal2Struct
s.AddFuture(timeout, func(f Future) {
})
s.AddReceive(sig1Ch, func(c Channel, more bool) {
c.Receive(ctx, signal1)
})
s.AddReceive(sig2Ch, func(c Channel, more bool) {
c.Receive(ctx, signal2)
})
s.Select(ctx)
if signal1 == nil && signal2 == nil {
// handle timeout
} else {
// process signals
}
Пример Java:
public interface MyWorkflow {
@WorkflowMethod
void main();
@SignalMethod
void signal1(Signal1Struct signal);
@SignalMethod
void signal2(Signal2Struct signal);
}
public class MyWorkflowImpl implements MyWorkflow {
private Signal1Struct signal1;
private Signal2Struct signal2;
@Override
public void main() {
Workflow.await(Duration.ofMinutes(30),
() -> signal1 != null || signal2 != null);
if (signal1 == null && signal2 == null) {
// handle timeout
}
// process signals
}
@Override
public void signal1(Signal1Struct signal) {
signal1 = signal;
}
@Override
public void signal2(Signal2Struct signal) {
signal2 = signal;
}
}
Обратите внимание, что это хорошая идея для учета простоев рабочего процесса. Например, давайте представим, что вышеуказанный рабочий процесс запущен и сигнал получен через 40 минут после запуска, когда все рабочие процессы не работают. В этом случае, когда рабочие возвращаются как timeout
будущее и signCh
будет не пустым. Поскольку Selector не гарантирует порядок, возможно, что сигнал доставляется до таймера, даже если он был получен после. Так что ваша логика кода должна учитывать это. Например, существует жесткое требование, чтобы сигнал, полученный через 30 минут после начала рабочего процесса, игнорировался. Затем приведенный выше пример должен быть изменен на:
Перейти образец:
...
start := workflow.Now(ctx); // must use workflow clock
s.Select(ctx)
duration := workflow.Now(ctx).Sub(start)
if duration.Minutes() >= 30 || (signal1 == nil && signal2 == nil) {
// handle timeout
} else {
// process signals
}
Пример Java:
public void main() {
long start = Workflow.currentTimeMillis(); // must use workflow clock
Duration timeout = Duration.ofMinutes(30);
Workflow.await(timeout, () -> signal1 != null || signal2 != null);
long duration = Workflow.currentTimeMillis() - start;
if (timeout.toMillis() <= duration || (signal1 == null && signal2 == null)) {
// handle timeout
}
// process signals
}
Обновленный код работает правильно, даже если выполнение рабочего процесса было отложено на час.