(Twitter) Штормовое окно об агрегации

Я играю с Storm, и мне интересно, где Storm указывает (если возможно) размер окна (сворачивание / скольжение) при агрегации. Например, если мы хотим найти актуальные темы предыдущего часа в Твиттере. Как мы указываем, что болт должен возвращать результаты за каждый час? Это делается программно внутри каждого болта? Или это какой-то способ указать "окно"?

2 ответа

Решение

Отказ от ответственности: я написал статью "Актуальные темы со штормом", на которую ссылается Гахов в своем ответе выше.

Я бы сказал, что лучше всего использовать так называемые тиковые кортежи в Storm 0.8+. С их помощью вы можете настроить свои собственные носики / болты, чтобы получать уведомления через определенные промежутки времени (скажем, каждые десять секунд или каждую минуту).

Вот простой пример, который настраивает рассматриваемый компонент для получения тиковых кортежей каждые десять секунд:

// in your spout/bolt
@Override
public Map<String, Object> getComponentConfiguration() {
    Config conf = new Config();
    int tickFrequencyInSeconds = 10;
    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFrequencyInSeconds);
    return conf;
}

Затем вы можете использовать условный переключатель в своем изливе / болте execute() метод, позволяющий отличить "нормальные" входящие кортежи от специальных тиковых кортежей. Например:

// in your spout/bolt
@Override
public void execute(Tuple tuple) {
    if (isTickTuple(tuple)) {
        // now you can trigger e.g. a periodic activity
    }
    else {
        // do something with the normal tuple
    }
}

private static boolean isTickTuple(Tuple tuple) {
    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
        && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
}

Опять же, я написал довольно подробный пост в блоге о том, как делать это в Storm, несколько дней назад, как указал Гахов (бесстыдная вилка!).

Добавьте новый носик со степенью параллелизма 1, и он будет выдавать пустой сигнал, а затем Utils.sleep до следующего раза (все будет сделано в nextTuple). Затем свяжите все соответствующие болты с этим носиком, используя всю группировку, чтобы все их экземпляры получали один и тот же сигнал.

Другие вопросы по тегам