Межпроцессное взаимодействие для обмена обновлениями данных в Java

Я создаю жетон с датчиками. У меня есть клиентские процессы, определенные в SensorClient учебный класс. Эти клиентские объекты получают информацию о списке других клиентов от процесса сервера, createSensor учебный класс.

Проблема в том, что я хотел бы, чтобы клиенты обновляли информацию, которую они имеют, когда она изменяется на сервере.

Класс сервера:

    public class createSensor {

                private static createSensor instance = null;
                private ArrayList<Sensor> sensor = new ArrayList<>();
               public int position, prevPosition, nextPosition, prevPort, nextPort;

                    private createSensor() {
                    } 
 public synchronized ArrayList insertSensor(String type, String identificator, int port, String id, String gatwayAddr, long timestamp) throws IOException {

            sensor.add(new Sensor(type, identificator, port, id, gatwayAddr, timestamp));
            return new ArrayList<>(sensor); // 
        }
    }
    public synchronized boolean hasMeasurements() {
        while (InnerBuffer.getInstance().emptyInnerBuffer())
            return false;
        return true;
    }


    public synchronized void setPrevNextWhenDelete(int position,ArrayList<Sensor> sensorList) throws IOException {
//code

    }

    public synchronized ArrayList<Sensor> getSensorList() {
        return new ArrayList<>(sensor);
    }

    public synchronized int size() {
        return sensor.size();
    }

    public synchronized String returnRecentMeasurement (String id){
        String recentMeasurement=null;
        for (Sensor sensori : sensor) {
            if (sensori.getIdentificator().equalsIgnoreCase(id))
               recentMeasurement= InnerBuffer.getInstance().returnRecentMeasurements(id);
            else
                recentMeasurement = null;}
        return recentMeasurement;
    }
                public synchronized void  setPrevNextWhenAdd() throws IOException {  //some other code where int position, prevPosition, nextPosition, prevPort, nextPort get their values.


              }}

Класс клиента:

public class SensorClient {
                public static void main(String[] args) throws Exception {
                    //Starting a new sensor
         Sensor sensor = new Sensor(type,identificator,portnumber,ipnumber,gatewayAddr,timestamp);
            Gson gson = new Gson();
            String message = gson.toJson(sensor);
            Client c = Client.create();
            WebResource r = c.resource("http://localhost:9999/gateway/");
            ClientResponse response = r.path("sensors/add").type(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, message);
            if (response.getStatus() == 200) {
                repeat = false;
                Type collectionType = new TypeToken<ArrayList<Sensor>>(){}.getType();
                ArrayList<Sensor> sensorList = gson.fromJson(response.getEntity(String.class), collectionType);
                System.out.println("Starting the sensor ...");
                System.out.println("Push exit when you want to delete the sensor!");
                int position = 0;
                for(int i = 0; i< sensorList.size();i++){
                  if(sensorList.get(i).getIdentificator().equalsIgnoreCase(sensor.getIdentificator()) )
                        position = i;
                }

                sensors.Sensor.simulation(type, identificator);// special thread for sensors simulations
                 createSensor.getInstance().setPrevNextWhenAdd(position,sensorList);
                serverSocket serverSocket = new serverSocket(portnumber,sensorList,position,sensorList.get(position).getNext());
                serverSocket.start();

                StopSensor stopSensor = new StopSensor(identificator,portnumber,position,sensorList);
                stopSensor.start();

                   oneSensor s = new oneSensor(portnumber,sensorList);
                    s.start();
              } else {
                repeat = true;
                count +=1;
                System.out.println("Error. Wrong data! ");
            }
          }
        while (repeat );
    }
}
                        }

ServerSocket поток

public class serverSocket extends Thread {
    public int port,nextPort;
    ArrayList<gateway.Sensor> sensorList;
    public static int position;
    public serverSocket(int port, ArrayList<gateway.Sensor> sensorList,int position,int nextPort) {
        this.port = port;
        this.nextPort=nextPort;
        this.sensorList= sensorList;
        this.position=position;}
    public void run() {
            ServerSocket welcomeSocket;
            Socket connectionSocket;
            try {
                welcomeSocket = new ServerSocket(port);
                while (true) {
                    connectionSocket = welcomeSocket.accept();

                    receivedMessages thread = new receivedMessages(connectionSocket,sensorList,position,nextPort);
                    thread.start();
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.err.println("Error!!!!!!!!!");
            }
        }
}

Поток полученных сообщений

public class receivedMessages extends Thread {

        private BufferedReader inFromClient;
        private Socket connectionSocket;
    ArrayList<gateway.Sensor> sensorList;
    int position,nextPort;
        public receivedMessages(Socket socket, ArrayList<gateway.Sensor> sensorList,int position,int nextPort){
            connectionSocket = socket;
            this.sensorList=sensorList;
            this.position=position;
            this.nextPort=nextPort;
           try {
                inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream()));
               } catch (IOException e) { e.printStackTrace(); }
            }
            @Override
        public void run() {
                //  while(true) {
                try {

                    String message = (inFromClient.readLine().toString());

                    System.out.println(message);
                    if (message.startsWith("Next") || message.startsWith("Previous")) {
                        System.out.println(message);
                    } else if (message.startsWith("The")) {

                        System.out.println(message);                        createSensor.getInstance().setPrevNextWhenDelete(position, sensorList);
                    } else  {// i receive the message that the list has changed
                    System.out.println(message);
                    sensorList = createSensor.getInstance().getSensorList();
                    System.out.println("Updated " + sensorList);}

1 ответ

Решение

Прислушиваться к изменениям

У ваших клиентских процессов есть 2 возможности быть в курсе изменений данных:

  • Ваши клиентские объекты могут регулярно проверять, был ли изменен список:

    • запрашивая текущую версию списка
    • или запросив временную метку изменения списка (класс вашего сервера, createSensor, тогда придется вести учет этого)
  • ваши клиентские объекты могут открывать порт для прослушивания уведомлений, а ваш серверный класс может уведомлять их об изменениях, выдвигая новый список, когда список изменяется.

Обмен обновлениями между потоком слушателя и рабочим потоком

В обоих случаях на стороне клиента у вас будет один поток, выполняющий работу, а другой поток ожидает изменений. Этот второй поток будет каждый раз восстанавливать новый список. Чтобы поделиться этим с рабочим потоком, вот что вы можете сделать:

  • ваш поток слушателя будет хранить ссылку на sensorList что он был передан рабочим потоком при создании
  • при восстановлении нового списка ваш поток слушателя сохранит этот новый список в другой ссылке, newList
  • ваш список слушателей может затем изменить "старый" список так, чтобы он соответствовал новому списку, используя oldList.clear() затем oldList.addAll(newList),

Это будет работать, поскольку, хотя рабочий поток может иметь свою собственную ссылку на список, обе ссылки указывают на один и тот же объект, поэтому он будет видеть изменения.

Код для запуска потока слушателя

Например, если ваш рабочий поток является основным потоком, и он создает поток слушателя, ваш код может выглядеть так внутри кода рабочего потока:

class Listener implements Runnable {

   List<Sensor> sensorList;
   Listener(List<Sensor> list ){
      sensorList = list;
   }

   public void run(){
      // code to listen to changes on the server
      // When server sends new information, thread can update the list directly:
      // sensorList.clear();
      // sensorList.addAll(newList);
   }
}

Listener l = new Listener(sensorList);
Thread listenerThread = new Thread(l);
listenerThread.start();

Вы можете прочитать о потоках и о том, как они разделяют память, в Учебниках Java по параллелизму.

Будьте осторожны с синхронизацией: когда вы обновляете старый список, вы должны убедиться, что другой поток не выполняет итерации по нему. Вы, вероятно, должны использовать Collections.synchronizedList(sensorList) чтобы убедиться, что нет никаких потоковых помех.

Другие вопросы по тегам