Начиная с Esper + сокеты

Я новичок с Эспером, и я хотел бы получить помощь. Мне уже удалось использовать Esper с файлами CSV, но теперь мне нужно использовать объекты Java в качестве событий, отправляемых через сокет, и я не могу найти в Интернете простые примеры, которые можно было бы использовать в качестве руководства.

Есть ли у кого-нибудь простые примеры на их основе?

Во всяком случае, я приведу здесь код, который я пытаюсь заставить работать. Ничего не происходит, когда я его запускаю, похоже, сокетное соединение не работает.

Класс сервера (он также содержит класс событий). Предлагается отправлять события:

import java.io.* ;
import java.net.* ;

class Server {

static final int PORT=5002;

    public Server( ) {
        try {
            ServerSocket skServer = new ServerSocket( PORT );
            System.out.println("Listening at " + PORT );
            Socket skClient = skServer.accept();        
            System.out.println("Serving to Esper");
            OutputStream aux = skClient.getOutputStream();
            ObjectOutputStream flux = new ObjectOutputStream( aux );
            int i = 0;
            while (i<10) {
                flux.writeObject( new MeteoEvent(i,"A") );
                i++;
                }
            flux.flush();
            skClient.close();
            System.out.println("End of transmission");
            } catch( Exception e ) {
            System.out.println( e.getMessage() );
        }
    }

    public static void main( String[] arg ) {
        new Server();
    }

    class MeteoEvent{

        private int sensorId;
        private String GeoArea;

        public MeteoEvent() {
        }

        public MeteoEvent(int sensorid, String geoarea) {
            this.sensorId = sensorid;
            this.GeoArea = geoarea;
        }

        public int getSensorId() {
            return sensorId;
        }

        public void setSensorId(int sensorId) {
            this.sensorId = sensorId;
        }

        public String getGeoArea() {
            return GeoArea;
        }

        public void setGeoArea(String geoArea) {
            GeoArea = geoArea;
        }   
    }
}

И основанный на эспере класс.

import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;


import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPAdministrator;
import com.espertech.esper.client.EPRuntime;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;
import com.espertech.esper.event.map.MapEventBean;
import com.espertech.esperio.socket.EsperIOSocketAdapter;
import com.espertech.esperio.socket.config.ConfigurationSocketAdapter;
import com.espertech.esperio.socket.config.DataType;
import com.espertech.esperio.socket.config.SocketConfig;

public class Demo {

    public static class CEPListener implements UpdateListener {

        private String tag;
        public CEPListener (String tag)
        {
            this.tag = tag;
        }

public static void main(String[] args) throws IOException, InterruptedException {
        Configuration configuration = new Configuration();

        Map<String, Object> eventProperties = new HashMap<String, Object>();
        eventProperties.put("sensorId", int.class);
        eventProperties.put("GeoArea", String.class);
        configuration.addEventType("MeteoEvent", eventProperties);

        ConfigurationSocketAdapter socketAdapterConfig = new ConfigurationSocketAdapter();

        SocketConfig socketConfig = new SocketConfig();
        socketConfig.setDataType(DataType.OBJECT);
        socketConfig.setPort(5002);
        socketAdapterConfig.getSockets().put("MeteoSocket", socketConfig);

        EPServiceProvider cepService = EPServiceProviderManager.getProvider("MeteoSocket",configuration);

        EPRuntime cepServiceRT = cepService.getEPRuntime();

        EPAdministrator cepAdmin = cepService.getEPAdministrator();

        EsperIOSocketAdapter socketAdapter = new EsperIOSocketAdapter (socketAdapterConfig, "MeteoSocket");
        socketAdapter.start();

        EPStatement stmt = cepAdmin.createEPL("insert into JoinStream select * from MeteoEvent");

        EPStatement outputStatementX = cepAdmin.createEPL("select * from JoinStream");

        outputStatementX.addListener(new CEPListener("JS"));

        cepService.initialize();

        Object lock = new Object();
        synchronized (lock)
        {
                lock.wait();
         }
}

Заранее большое спасибо, если кто-то потратит время, пытаясь мне помочь.

1 ответ

Задача решена! Список Esper Dev был очень полезен. Я научился использовать сокеты Esper + через тестовые классы, расположенные здесь

С наилучшими пожеланиями!

Другие вопросы по тегам