Как запрограммировать источник реактивного потока, который получает данные асинхронно

Я просматривал документацию на hazelcast-jet, чтобы найти ссылки на источники, которые асинхронно заполняются каким-то внешним процессом - в моем случае это были бы сообщения http.

Я посмотрел на код Кафки, так как он кажется самым близким, но не могу понять, как только что появившееся событие вызовет что-либо. Я предполагаю, что здесь не будет никакого блокирующего потока.

Я был бы признателен за любые указатели, чтобы лучше понять, как я могу использовать газель-реактивный двигатель в среде, где элементы "потока" питаются капельно.

1 ответ

В следующей версии 0.7 Hazelcast Jet представлен объект Source Builder, который упрощает создание собственного источника. Вы можете использовать его для написания кода, подобного следующему:

public static void main(String[] args) {
    Pipeline pipeline = Pipeline.create();
    StreamSource<String> source = SourceBuilder
            .timestampedStream("http-trickle", x -> new HttpSource())
            .fillBufferFn(HttpSource::addToBuffer)
            .destroyFn(HttpSource::destroy)
            .build();
    StreamStage<String> srcStage = pipeline.drawFrom(source);
}

private static class HttpSource {
    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>(10000);
    private final ArrayList<String> buffer = new ArrayList<>();
    private final AsyncClient<String> client = 
        new AsyncClient<String>().addReceiveListener(queue::add);

    void addToBuffer(TimestampedSourceBuffer<String> sourceBuffer) {
        queue.drainTo(buffer);
        for (String line : buffer) {
            sourceBuffer.add(line, extractTimestamp(line));
        }
        buffer.clear();
    }

    void destroy() {
        client.close();
    }
}

Здесь я использовал макет AsyncClient это должно представлять ваш фактический асинхронный HTTP-клиент. Он ожидает, что вы предоставите обратный вызов, который будет обрабатывать входящие данные, когда они поступят. Разработчик исходного кода Jet запрашивает еще один обратный вызов, fillBufferFn, который отправляет данные в конвейер обработки.

Ваш обратный звонок в AsyncClient следует отправить данные в параллельную очередь и ваш fillBufferFn следует слить очередь в исходный буфер Jet.

У вас может возникнуть желание упростить код, который я дал этому:

void addToBufferDirect(TimestampedSourceBuffer<String> sourceBuffer) {
    for (String line; (line = queue.poll()) != null;) {
        sourceBuffer.add(line, extractTimestamp(line));
    }
}

Это позволяет избежать промежуточного буфера, стоящего между параллельной очередью и исходным буфером Jet. Это на самом деле будет работать большую часть времени, но если вы когда-либо испытываете сильный пик трафика, addToBufferDirect может никогда не завершиться. Это нарушило бы контракт с Jet, который требует от вас fillBufferFn в течение секунды или около того.

Мы уже осознали, что этот шаблон использования компоновщика исходного кода с асинхронными клиентскими API очень распространен, и мы планируем обеспечить больше удобства для его обработки.

Другие вопросы по тегам