Есть ли способ перехватить кортеж / сек через оператора в IBM Streams (не через консоль Streams)

Я хочу захватить количество кортежей в секунду через оператора и записать его в файл. Я не могу использовать "Оператор газа", чтобы установить частоту кортежей самостоятельно. Кроме того, чтобы добавить еще раз, я не говорю о захвате информации через консоль, но через приложение SPL.

1 ответ

Решение

Прямой метрики "дай мне пропускную способность для этого оператора" не существует. Вы могли бы реализовать примитивный оператор, который обращается к nTuplesProcessed метрика с течением времени и рассчитывает пропускную способность из этого. ( Список доступных метрик.) Но на самом деле мне гораздо проще использовать следующий составной оператор:

public composite PeriodicThroughputSink(input In) {
param expression<float64> $period;
      expression<rstring> $file;
graph
    stream<boolean b> Period = Beacon() {
        param period: $period;
    }

    stream<float64 throughput> Throughput = Custom(In; Period) {
        logic state: {
            mutable uint64 _count = 0;
            float64 _period = $period;
        }

        onTuple In: {
            ++_count;
        }

        onTuple Period: {
            if (_count > 0ul) {
                submit({throughput=((float64)_count / _period)}, Throughput);
                _count = 0ul;
            }
        }

        config threadedPort: queue(Period, Sys.Wait); // ensures that the throughput calculation and file
                                                      // writing is on a different thread from the rest 
                                                      // of the application
    }

    () as Sink = FileSink(Throughput) {
        param file: $file;
              format: txt;
              flush: 1u;
    }
}

Затем вы можете использовать составной оператор в качестве "отвода пропускной способности", когда он потребляет поток от любого оператора, пропускную способность которого вы хотите записать. Например, вы можете использовать его так:

stream<Data> Result = OperatorYouCareAbout(In) {}

() as ResultThroughput = PeriodicThroughputSink(Result) {
    param period: 5.0;
          file: "ResultThroughput.txt";
} 

Конечно, вы можете использовать Result поток в другом месте в вашем приложении. Имейте в виду, что этот метод может оказать некоторое влияние на производительность приложения: мы нажимаем на путь данных. Но воздействие не должно быть значительным, особенно если вы убедитесь, что операторы в PeriodicThroughputSink слиты с тем же PE, что и оператор, к которому вы подключаетесь. Кроме того, чем короче период, тем больше вероятность того, что это повлияет на производительность приложений.

Опять же, мы могли бы сделать нечто подобное в примитивном операторе C++ или Java, открыв nTuplesProcessed метрика, но я считаю, что вышеупомянутый подход намного проще. Вы также можете получить системные показатели за пределами вашего приложения; скажем, вы могли бы иметь скрипт, который периодически использует streamtool capturestate или REST API, а затем проанализируйте вывод, найдите nTuplesProcessed показатель для оператора, который вам нужен, и используйте его для расчета пропускной способности. Но я нахожу технику в этом составном операторе намного проще.

Другие вопросы по тегам