Использование тиковых кортежей с трезубцем в шторме

Я могу использовать стандартный носик, комбинацию болтов для потоковой агрегации и очень хорошо работает в удачном случае, когда использую тиковые кортежи для сохранения данных через некоторый интервал для использования пакетной обработки. Прямо сейчас я делаю некоторое управление отказами (отслеживание не сохраненных кортежей и т. Д.) Самостоятельно (т.е. не ootb от шторма)

Но я читал, что трезубец дает вам более высокую абстракцию и лучшее управление отказами. Что я не понимаю, так это то, есть ли поддержка тикетного тикета в трезубце. По сути, я хотел бы создать пакетную память для текущей минуты или около того и сохранить любые агрегированные данные за предыдущие минуты, используя трезубец.

Любые указатели здесь или предложения по дизайну будут полезны.

Спасибо

1 ответ

На самом деле, микропакетирование - это встроенная функция Trident. Вам не нужны никакие клещи для этого. Когда у вас есть что-то подобное в вашем коде:

topology
    .newStream("myStream", spout)
    .partitionPersist(
        ElasticSearchEventState.getFactoryFor(connectionProvider),
        new Fields("field1", "field2"),
        new ElasticSearchEventUpdater()
    )

(Я использую здесь свой собственный ElasticSearch State / Updater, вы можете использовать что-то еще)

Поэтому, когда у вас есть что-то подобное, под капотом Trident группируйте ваш поток по пакетам и выполняйте операцию partitionPersist не для отдельных кортежей, а для этих пакетов.

Если по каким-либо причинам вам все еще нужны тиковые кортежи, просто создайте свой тиковый носик, что-то вроде этого мне подходит

public class TickSpout implements IBatchSpout {

    public static final String TIMESTAMP_FIELD = "timestamp";
    private final long delay;

    public TickSpout(long delay) {
        this.delay = delay;
    }

    @Override
    public void open(Map conf, TopologyContext context) {
    }

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        Utils.sleep(delay);
        collector.emit(new Values(System.currentTimeMillis()));
    }

    @Override
    public void ack(long batchId) {
    }

    @Override
    public void close() {
    }

    @Override
    public Map getComponentConfiguration() {
        return null;
    }

    @Override
    public Fields getOutputFields() {
        return new Fields(TIMESTAMP_FIELD);
    }
}
Другие вопросы по тегам