Присоединение подписчика / слушателя в модуле EPL к заявлению с разделами контекста

У меня есть следующий модуль EPL, который успешно развертывает:

module context;

import events.*;
import configDemo.*;
import annotations.*;
import main.*;
import subscribers.*;
import listeners.*;

@Name('schemaCreator')
create schema InitEvent(firstStock String, secondStock String, bias double);

@Name('createSchemaEvent')
create schema TickEvent as TickEvent; 

@Name('contextCreator')
create context TwoStocksContext
initiated by InitEvent as initEvent;


@Name('compareStocks') 
@Description('Compare the difference between two different stocks and make a decision')
@Subscriber('subscribers.MySubscriber')
context TwoStocksContext 
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,     
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode =  context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >=  context.initEvent.bias and         
B.stockCode =  context.initEvent.secondStock
);

У меня проблема со слушателями / подписчиками. Согласно моим проверкам и отладке, у классов нет проблем, аннотации работают, они прикрепляются к заявлению при развертывании, и тем не менее ни один из них не получает никаких обновлений от событий.

Это мой подписчик, я просто хочу напечатать, что он был получен:

package subscribers;
import java.util.Map;

public class MySubscriber {

public void update(Map row) {
    System.out.println("got it");
    }
}

Раньше у меня был тот же модуль без каких-либо контекстных разделов, а затем подписчики работали без проблем. После того как я добавил контекст, он остановился.

Пока что я попробовал:

  1. Проверка, есть ли в заявлении подписчик / прослушиватель (это так)
  2. Проверка их имен
  3. Удалите аннотации и установите их вручную в коде Java после развертывания (то же самое - они прикрепляются, я могу получить их имя, но все равно не получаю обновления)
  4. Отладка абонентского класса. Программа либо вообще не идет туда, чтобы остановиться в точке останова, либо я получаю сообщение об ошибке (ошибка атрибута отсутствующего номера строки - ("невозможно установить точку останова", которую я пытался исправить безрезультатно)

Любая идея, что может вызвать это или как лучше всего настроить подписчика на оператор с контекстными разделами?

Это продолжение предыдущей проблемы, которая была решена здесь - Создание экземпляров Эспера

РЕДАКТИРОВАТЬ: События отправляются в формате, который я использую, и в формате онлайн-инструмента EPL:

Сначала я получаю пару, от которой будет следовать пользователь:

    System.out.println("First stock:"); 
    String first = scanner.nextLine();
    System.out.println("Second stock:"); 
    String second = scanner.nextLine();
    System.out.println("Difference:"); 
    double diff= scanner.nextDouble();
    InitEvent init = new InitEvent(first, second, diff);

После этого у меня есть движок, который непрерывно отправляет события, но перед его запуском InitEvents отправляется так:

@Override
public void run() {

    runtime.sendEvent(initEvent);   

    while (contSimulation) {

        TickEvent tick1 = new TickEvent(Math.random() * 100, "YAH");
        runtime.sendEvent(tick1);

        TickEvent tick2 = new TickEvent(Math.random() * 100, "GOO");
        runtime.sendEvent(tick2);

        TickEvent tick3 = new TickEvent(Math.random() * 100, "IBM");
        runtime.sendEvent(tick3);

        TickEvent tick4 = new TickEvent(Math.random() * 100, "MIC");
        runtime.sendEvent(tick4);

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        latch.countDown();

    }

} 

Раньше я не пользовался онлайн-инструментом, но думаю, что он работает. Это текст модуля:

module context; 

create schema InitEvent(firstStock String, secondStock String, bias double);
create schema TickEvent(currentPrice double, stockCode String);

create context TwoStocksContext
initiated by InitEvent as initEvent;

context TwoStocksContext 
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice, 
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode =  context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >=  context.initEvent.bias and 
B.stockCode =  context.initEvent.secondStock
);

И последовательность событий:

InitEvent={firstStock='YAH', secondStock = 'GOO', bias=5}
TickEvent={currentPrice=55.6, stockCode='YAH'}
TickEvent={currentPrice=50.4, stockCode='GOO'}
TickEvent={currentPrice=30.8, stockCode='MIC'}
TickEvent={currentPrice=24.9, stockCode='APP'}

TickEvent={currentPrice=51.6, stockCode='YAH'}
TickEvent={currentPrice=45.8, stockCode='GOO'}
TickEvent={currentPrice=32.8, stockCode='MIC'}
TickEvent={currentPrice=28.9, stockCode='APP'}

Результат, который я получаю, используя их:

At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=55.6, b_currentPrice=50.4, a_stockCode='YAH', 
b_stockCode='GOO'}
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=51.6, b_currentPrice=45.8, a_stockCode='YAH', 
b_stockCode='GOO'}

Если я создаю второй набор событий с разницей менее 5 между YAH/GOO, я получаю вывод только из первой пары, что имеет смысл. Это, я думаю, что он должен делать.

В случае необходимости эти два метода читают и обрабатывают аннотации модуля EPL (я сам не писал их, они взяты из класса contextTrader, которую можно найти здесь - https://github.com/timolson/cointrader/blob/master/src/main/java/org/cryptocoinpartners/module/Context.java):

private static Object getSubscriber(String className) throws Exception {

    Class<?> cl = Class.forName(className);
    return cl.newInstance();
}

private static void processAnnotations(EPStatement statement) throws Exception {

    Annotation[] annotations = statement.getAnnotations();
    for (Annotation annotation : annotations) {
        if (annotation instanceof Subscriber) {

            Subscriber subscriber = (Subscriber) annotation;
            Object obj = getSubscriber(subscriber.className());
            System.out.println(subscriber.className());
            statement.setSubscriber(obj);

        } else if (annotation instanceof Listeners) {

            Listeners listeners = (Listeners) annotation;
            for (String className : listeners.classNames()) {
                Class<?> cl = Class.forName(className);
                Object obj = cl.newInstance();
                if (obj instanceof StatementAwareUpdateListener) {
                    statement.addListener((StatementAwareUpdateListener) obj);
                } else {
                    statement.addListener((UpdateListener) obj);
                }
            }


        }
    }
}

1 ответ

Решение

Что ж, после месяца борьбы я наконец решил это. В случае, если у кого-то возникнут подобные проблемы в будущем, вот где проблема была. Эпл отлично работал в онлайн-инструменте, но не в моем коде. В конце концов я выяснил, что начальные события не запускаются, поэтому контекстные разделы не создаются, и в результате подписчики и слушатели не получают никаких обновлений. Моя ошибка состояла в том, что у меня было запущено POJO InitEvent, но событие, которое использовал контекст, было создано в модуле EPL через схему создания. Я не знаю, о чем я думал, теперь имеет смысл, что это не сработало. В результате события, которые я запускаю в Java, не являются событиями, которые использует контекст. Мое решение было только в рамках EPL. Поскольку я не мог понять, могу ли я запускать события в Java, которые создаются внутри модуля, я создал схему, которая заполняется моим POJO, и поток затем используется контекстом как таковой:

@Name('schemaCreator')
create schema StartEvent(firstStock string, secondStock string, difference 
double);

@Name('insertInitEvent')
insert into StartEvent 
select * from InitEvent; 

Все остальное остается таким же, как и код Java.

Другие вопросы по тегам