Использование тиковых кортежей с трезубцем в шторме
Я могу использовать стандартный носик, комбинацию болтов для потоковой агрегации и очень хорошо работает в удачном случае, когда использую тиковые кортежи для сохранения данных через некоторый интервал для использования пакетной обработки. Прямо сейчас я делаю некоторое управление отказами (отслеживание не сохраненных кортежей и т. Д.) Самостоятельно (т.е. не ootb от шторма)
Но я читал, что трезубец дает вам более высокую абстракцию и лучшее управление отказами. Что я не понимаю, так это то, есть ли поддержка тикетного тикета в трезубце. По сути, я хотел бы создать пакетную память для текущей минуты или около того и сохранить любые агрегированные данные за предыдущие минуты, используя трезубец.
Любые указатели здесь или предложения по дизайну будут полезны.
Спасибо
1 ответ
На самом деле, микропакетирование - это встроенная функция Trident. Вам не нужны никакие клещи для этого. Когда у вас есть что-то подобное в вашем коде:
topology
.newStream("myStream", spout)
.partitionPersist(
ElasticSearchEventState.getFactoryFor(connectionProvider),
new Fields("field1", "field2"),
new ElasticSearchEventUpdater()
)
(Я использую здесь свой собственный ElasticSearch State / Updater, вы можете использовать что-то еще)
Поэтому, когда у вас есть что-то подобное, под капотом Trident группируйте ваш поток по пакетам и выполняйте операцию partitionPersist не для отдельных кортежей, а для этих пакетов.
Если по каким-либо причинам вам все еще нужны тиковые кортежи, просто создайте свой тиковый носик, что-то вроде этого мне подходит
public class TickSpout implements IBatchSpout {
public static final String TIMESTAMP_FIELD = "timestamp";
private final long delay;
public TickSpout(long delay) {
this.delay = delay;
}
@Override
public void open(Map conf, TopologyContext context) {
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
Utils.sleep(delay);
collector.emit(new Values(System.currentTimeMillis()));
}
@Override
public void ack(long batchId) {
}
@Override
public void close() {
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields(TIMESTAMP_FIELD);
}
}