Как запрограммировать источник реактивного потока, который получает данные асинхронно
Я просматривал документацию на hazelcast-jet, чтобы найти ссылки на источники, которые асинхронно заполняются каким-то внешним процессом - в моем случае это были бы сообщения http.
Я посмотрел на код Кафки, так как он кажется самым близким, но не могу понять, как только что появившееся событие вызовет что-либо. Я предполагаю, что здесь не будет никакого блокирующего потока.
Я был бы признателен за любые указатели, чтобы лучше понять, как я могу использовать газель-реактивный двигатель в среде, где элементы "потока" питаются капельно.
1 ответ
В следующей версии 0.7 Hazelcast Jet представлен объект Source Builder, который упрощает создание собственного источника. Вы можете использовать его для написания кода, подобного следующему:
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
StreamSource<String> source = SourceBuilder
.timestampedStream("http-trickle", x -> new HttpSource())
.fillBufferFn(HttpSource::addToBuffer)
.destroyFn(HttpSource::destroy)
.build();
StreamStage<String> srcStage = pipeline.drawFrom(source);
}
private static class HttpSource {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10000);
private final ArrayList<String> buffer = new ArrayList<>();
private final AsyncClient<String> client =
new AsyncClient<String>().addReceiveListener(queue::add);
void addToBuffer(TimestampedSourceBuffer<String> sourceBuffer) {
queue.drainTo(buffer);
for (String line : buffer) {
sourceBuffer.add(line, extractTimestamp(line));
}
buffer.clear();
}
void destroy() {
client.close();
}
}
Здесь я использовал макет AsyncClient
это должно представлять ваш фактический асинхронный HTTP-клиент. Он ожидает, что вы предоставите обратный вызов, который будет обрабатывать входящие данные, когда они поступят. Разработчик исходного кода Jet запрашивает еще один обратный вызов, fillBufferFn
, который отправляет данные в конвейер обработки.
Ваш обратный звонок в AsyncClient
следует отправить данные в параллельную очередь и ваш fillBufferFn
следует слить очередь в исходный буфер Jet.
У вас может возникнуть желание упростить код, который я дал этому:
void addToBufferDirect(TimestampedSourceBuffer<String> sourceBuffer) {
for (String line; (line = queue.poll()) != null;) {
sourceBuffer.add(line, extractTimestamp(line));
}
}
Это позволяет избежать промежуточного буфера, стоящего между параллельной очередью и исходным буфером Jet. Это на самом деле будет работать большую часть времени, но если вы когда-либо испытываете сильный пик трафика, addToBufferDirect
может никогда не завершиться. Это нарушило бы контракт с Jet, который требует от вас fillBufferFn
в течение секунды или около того.
Мы уже осознали, что этот шаблон использования компоновщика исходного кода с асинхронными клиентскими API очень распространен, и мы планируем обеспечить больше удобства для его обработки.