Присоединение подписчика / слушателя в модуле EPL к заявлению с разделами контекста
У меня есть следующий модуль EPL, который успешно развертывает:
module context;
import events.*;
import configDemo.*;
import annotations.*;
import main.*;
import subscribers.*;
import listeners.*;
@Name('schemaCreator')
create schema InitEvent(firstStock String, secondStock String, bias double);
@Name('createSchemaEvent')
create schema TickEvent as TickEvent;
@Name('contextCreator')
create context TwoStocksContext
initiated by InitEvent as initEvent;
@Name('compareStocks')
@Description('Compare the difference between two different stocks and make a decision')
@Subscriber('subscribers.MySubscriber')
context TwoStocksContext
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode = context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >= context.initEvent.bias and
B.stockCode = context.initEvent.secondStock
);
У меня проблема со слушателями / подписчиками. Согласно моим проверкам и отладке, у классов нет проблем, аннотации работают, они прикрепляются к заявлению при развертывании, и тем не менее ни один из них не получает никаких обновлений от событий.
Это мой подписчик, я просто хочу напечатать, что он был получен:
package subscribers;
import java.util.Map;
public class MySubscriber {
public void update(Map row) {
System.out.println("got it");
}
}
Раньше у меня был тот же модуль без каких-либо контекстных разделов, а затем подписчики работали без проблем. После того как я добавил контекст, он остановился.
Пока что я попробовал:
- Проверка, есть ли в заявлении подписчик / прослушиватель (это так)
- Проверка их имен
- Удалите аннотации и установите их вручную в коде Java после развертывания (то же самое - они прикрепляются, я могу получить их имя, но все равно не получаю обновления)
- Отладка абонентского класса. Программа либо вообще не идет туда, чтобы остановиться в точке останова, либо я получаю сообщение об ошибке (ошибка атрибута отсутствующего номера строки - ("невозможно установить точку останова", которую я пытался исправить безрезультатно)
Любая идея, что может вызвать это или как лучше всего настроить подписчика на оператор с контекстными разделами?
Это продолжение предыдущей проблемы, которая была решена здесь - Создание экземпляров Эспера
РЕДАКТИРОВАТЬ: События отправляются в формате, который я использую, и в формате онлайн-инструмента EPL:
Сначала я получаю пару, от которой будет следовать пользователь:
System.out.println("First stock:");
String first = scanner.nextLine();
System.out.println("Second stock:");
String second = scanner.nextLine();
System.out.println("Difference:");
double diff= scanner.nextDouble();
InitEvent init = new InitEvent(first, second, diff);
После этого у меня есть движок, который непрерывно отправляет события, но перед его запуском InitEvents отправляется так:
@Override
public void run() {
runtime.sendEvent(initEvent);
while (contSimulation) {
TickEvent tick1 = new TickEvent(Math.random() * 100, "YAH");
runtime.sendEvent(tick1);
TickEvent tick2 = new TickEvent(Math.random() * 100, "GOO");
runtime.sendEvent(tick2);
TickEvent tick3 = new TickEvent(Math.random() * 100, "IBM");
runtime.sendEvent(tick3);
TickEvent tick4 = new TickEvent(Math.random() * 100, "MIC");
runtime.sendEvent(tick4);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
latch.countDown();
}
}
Раньше я не пользовался онлайн-инструментом, но думаю, что он работает. Это текст модуля:
module context;
create schema InitEvent(firstStock String, secondStock String, bias double);
create schema TickEvent(currentPrice double, stockCode String);
create context TwoStocksContext
initiated by InitEvent as initEvent;
context TwoStocksContext
select * from TickEvent
match_recognize (
measures A.currentPrice as a_currentPrice, B.currentPrice as b_currentPrice,
A.stockCode as a_stockCode, B.stockCode as b_stockCode
pattern (A C* B)
define
A as A.stockCode = context.initEvent.firstStock,
B as A.currentPrice - B.currentPrice >= context.initEvent.bias and
B.stockCode = context.initEvent.secondStock
);
И последовательность событий:
InitEvent={firstStock='YAH', secondStock = 'GOO', bias=5}
TickEvent={currentPrice=55.6, stockCode='YAH'}
TickEvent={currentPrice=50.4, stockCode='GOO'}
TickEvent={currentPrice=30.8, stockCode='MIC'}
TickEvent={currentPrice=24.9, stockCode='APP'}
TickEvent={currentPrice=51.6, stockCode='YAH'}
TickEvent={currentPrice=45.8, stockCode='GOO'}
TickEvent={currentPrice=32.8, stockCode='MIC'}
TickEvent={currentPrice=28.9, stockCode='APP'}
Результат, который я получаю, используя их:
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=55.6, b_currentPrice=50.4, a_stockCode='YAH',
b_stockCode='GOO'}
At: 2001-01-01 08:00:00.000
Statement: Stmt-4
Insert
Stmt-4-output={a_currentPrice=51.6, b_currentPrice=45.8, a_stockCode='YAH',
b_stockCode='GOO'}
Если я создаю второй набор событий с разницей менее 5 между YAH/GOO, я получаю вывод только из первой пары, что имеет смысл. Это, я думаю, что он должен делать.
В случае необходимости эти два метода читают и обрабатывают аннотации модуля EPL (я сам не писал их, они взяты из класса contextTrader, которую можно найти здесь - https://github.com/timolson/cointrader/blob/master/src/main/java/org/cryptocoinpartners/module/Context.java):
private static Object getSubscriber(String className) throws Exception {
Class<?> cl = Class.forName(className);
return cl.newInstance();
}
private static void processAnnotations(EPStatement statement) throws Exception {
Annotation[] annotations = statement.getAnnotations();
for (Annotation annotation : annotations) {
if (annotation instanceof Subscriber) {
Subscriber subscriber = (Subscriber) annotation;
Object obj = getSubscriber(subscriber.className());
System.out.println(subscriber.className());
statement.setSubscriber(obj);
} else if (annotation instanceof Listeners) {
Listeners listeners = (Listeners) annotation;
for (String className : listeners.classNames()) {
Class<?> cl = Class.forName(className);
Object obj = cl.newInstance();
if (obj instanceof StatementAwareUpdateListener) {
statement.addListener((StatementAwareUpdateListener) obj);
} else {
statement.addListener((UpdateListener) obj);
}
}
}
}
}
1 ответ
Что ж, после месяца борьбы я наконец решил это. В случае, если у кого-то возникнут подобные проблемы в будущем, вот где проблема была. Эпл отлично работал в онлайн-инструменте, но не в моем коде. В конце концов я выяснил, что начальные события не запускаются, поэтому контекстные разделы не создаются, и в результате подписчики и слушатели не получают никаких обновлений. Моя ошибка состояла в том, что у меня было запущено POJO InitEvent, но событие, которое использовал контекст, было создано в модуле EPL через схему создания. Я не знаю, о чем я думал, теперь имеет смысл, что это не сработало. В результате события, которые я запускаю в Java, не являются событиями, которые использует контекст. Мое решение было только в рамках EPL. Поскольку я не мог понять, могу ли я запускать события в Java, которые создаются внутри модуля, я создал схему, которая заполняется моим POJO, и поток затем используется контекстом как таковой:
@Name('schemaCreator')
create schema StartEvent(firstStock string, secondStock string, difference
double);
@Name('insertInitEvent')
insert into StartEvent
select * from InitEvent;
Все остальное остается таким же, как и код Java.