Nesper высокая загрузка ЦП / памяти при использовании окна win:time

Я использую Nesper в качестве механизма CEP для обработки событий в моем приложении.

Я пытаюсь смоделировать следующее заявление EPL:

  • Value Поле событий усредняется за ограниченное время окна.
  • Если какое-либо из этих средних значений совпадает 1 желаемое значение, событие генерируется.

Я смоделировал это как:

SELECT (
(AVG(ParameterEvent1.Value) = 1) OR 
(AVG(ParameterEvent2.Value) = 1)
...
(AVG(ParameterEvent50.Value) = 1)
) AS BooleanValue 
FROM 
ParameterEvent(Id = 1).win:time(3 sec) AS ParameterEvent1, 
ParameterEvent(Id = 2).win:time(3 sec) AS ParameterEvent2 
...
ParameterEvent(Id = 50).win:time(3 sec) AS ParameterEvent50

И в отдельном потоке я подаю двигателю значения параметров с постоянной скоростью 50 событий / сек.

Эта установка вызывает огромное использование процессора и оперативной памяти до com.espertech.esper.client.EPException: ReaderWriterLock timeout expired Исключение случается.

Интересно, что вызывает проблему. Интересно, что когда я меняю окно из win:time(3 sec) в std:lastevent() эта проблема решается. Тем не менее, мне нужно окно, чтобы убедиться, что значение параметров 1 до 3 секунд до генерации события.

Полный демонстрационный код приведен ниже. Вам просто нужно установить Nesper пакет для запуска:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using com.espertech.esper.client;

namespace NesperTester
{
    class Program
    {
        public class ParameterEvent
        {
            public int Id { get; set; }
            public int Value { get; set; }
        }

        static EPServiceProvider engine;

        const int NParameters = 50;

        static void Main(string[] args)
        {
            engine = EPServiceProviderManager.GetDefaultProvider();

            engine.EPAdministrator.Configuration.AddEventType("ParameterEvent", typeof(ParameterEvent));

            CreateEPLStatement();

            Task.Factory.StartNew(() => SimulateParameterChange(), TaskCreationOptions.LongRunning);

            System.Threading.Thread.Sleep(int.MaxValue);
        }

        static void CreateEPLStatement()
        {
            const int windowLength = 3;

            var sb = new StringBuilder();

            for (int i = 1; i <= NParameters; i++)
            {
                sb.AppendFormat("(AVG(ParameterEvent{0}.Value) = 1) OR ", i);
            }

            var rule = sb.ToString();
            rule = rule.Remove(rule.Length - 4, 4);
            sb.Clear();

            for (int paramId = 1; paramId <= NParameters; paramId++)
            {
                sb.AppendFormat("ParameterEvent(Id = {0}).win:time({1} sec) AS ParameterEvent{0}, ", paramId, windowLength);
                //sb.AppendFormat("ParameterEvent(Id = {0}).std:lastevent() AS ParameterEvent{0}, ", paramId, windowLength);
            }

            var selectSources = sb.ToString();
            selectSources = selectSources.Remove(selectSources.Length - 2, 2);

            var eplStr = string.Format("SELECT ({0}) AS BooleanValue FROM {1}", rule, selectSources);

            var statementName = string.Format("statement1");

            var statement = engine.EPAdministrator.CreateEPL(eplStr, statementName, null);
            statement.Start();
            statement.Events += Statement_Events;
        }


        static bool PrevState = false;
        static void Statement_Events(object sender, UpdateEventArgs e)
        {
            var underlying = e.NewEvents[0].Underlying as Dictionary<string, object>;

            if (underlying.Last().Value == null)
            {
                Console.WriteLine("Statement value is unknown");
            }
            else
            {
                var statementValue = (bool)underlying.First().Value;

                if (statementValue != PrevState)
                {
                    PrevState = statementValue;
                    Console.WriteLine("Statement is {0}", statementValue);
                }
            }
        }

        static void SimulateParameterChange()
        {
            int counter = 0;

            while (true)
            {
                for (int i = 1; i <= NParameters; i++)
                {
                    var evt = new ParameterEvent();

                    evt.Id = i;

                    if (i == 1 && (counter/4) % 2 == 0)
                    {
                        evt.Value = 1;
                    }
                    else
                    {
                        evt.Value = 0;
                    }

                    engine.EPRuntime.SendEvent(evt);

                    System.Threading.Thread.Sleep(1000 / NParameters);
                }

                counter++;
            }
        }
    }
}

1 ответ

См. http://espertech.com/esper/solution_patterns.php Предложение "Имея" для запуска.

Другие вопросы по тегам