Есть ли способ перехватить кортеж / сек через оператора в IBM Streams (не через консоль Streams)
Я хочу захватить количество кортежей в секунду через оператора и записать его в файл. Я не могу использовать "Оператор газа", чтобы установить частоту кортежей самостоятельно. Кроме того, чтобы добавить еще раз, я не говорю о захвате информации через консоль, но через приложение SPL.
1 ответ
Прямой метрики "дай мне пропускную способность для этого оператора" не существует. Вы могли бы реализовать примитивный оператор, который обращается к nTuplesProcessed
метрика с течением времени и рассчитывает пропускную способность из этого. ( Список доступных метрик.) Но на самом деле мне гораздо проще использовать следующий составной оператор:
public composite PeriodicThroughputSink(input In) {
param expression<float64> $period;
expression<rstring> $file;
graph
stream<boolean b> Period = Beacon() {
param period: $period;
}
stream<float64 throughput> Throughput = Custom(In; Period) {
logic state: {
mutable uint64 _count = 0;
float64 _period = $period;
}
onTuple In: {
++_count;
}
onTuple Period: {
if (_count > 0ul) {
submit({throughput=((float64)_count / _period)}, Throughput);
_count = 0ul;
}
}
config threadedPort: queue(Period, Sys.Wait); // ensures that the throughput calculation and file
// writing is on a different thread from the rest
// of the application
}
() as Sink = FileSink(Throughput) {
param file: $file;
format: txt;
flush: 1u;
}
}
Затем вы можете использовать составной оператор в качестве "отвода пропускной способности", когда он потребляет поток от любого оператора, пропускную способность которого вы хотите записать. Например, вы можете использовать его так:
stream<Data> Result = OperatorYouCareAbout(In) {}
() as ResultThroughput = PeriodicThroughputSink(Result) {
param period: 5.0;
file: "ResultThroughput.txt";
}
Конечно, вы можете использовать Result
поток в другом месте в вашем приложении. Имейте в виду, что этот метод может оказать некоторое влияние на производительность приложения: мы нажимаем на путь данных. Но воздействие не должно быть значительным, особенно если вы убедитесь, что операторы в PeriodicThroughputSink
слиты с тем же PE, что и оператор, к которому вы подключаетесь. Кроме того, чем короче период, тем больше вероятность того, что это повлияет на производительность приложений.
Опять же, мы могли бы сделать нечто подобное в примитивном операторе C++ или Java, открыв nTuplesProcessed
метрика, но я считаю, что вышеупомянутый подход намного проще. Вы также можете получить системные показатели за пределами вашего приложения; скажем, вы могли бы иметь скрипт, который периодически использует streamtool capturestate
или REST API, а затем проанализируйте вывод, найдите nTuplesProcessed
показатель для оператора, который вам нужен, и используйте его для расчета пропускной способности. Но я нахожу технику в этом составном операторе намного проще.