Межпроцессное взаимодействие для обмена обновлениями данных в Java
Я создаю жетон с датчиками. У меня есть клиентские процессы, определенные в SensorClient
учебный класс. Эти клиентские объекты получают информацию о списке других клиентов от процесса сервера, createSensor
учебный класс.
Проблема в том, что я хотел бы, чтобы клиенты обновляли информацию, которую они имеют, когда она изменяется на сервере.
Класс сервера:
public class createSensor {
private static createSensor instance = null;
private ArrayList<Sensor> sensor = new ArrayList<>();
public int position, prevPosition, nextPosition, prevPort, nextPort;
private createSensor() {
}
public synchronized ArrayList insertSensor(String type, String identificator, int port, String id, String gatwayAddr, long timestamp) throws IOException {
sensor.add(new Sensor(type, identificator, port, id, gatwayAddr, timestamp));
return new ArrayList<>(sensor); //
}
}
public synchronized boolean hasMeasurements() {
while (InnerBuffer.getInstance().emptyInnerBuffer())
return false;
return true;
}
public synchronized void setPrevNextWhenDelete(int position,ArrayList<Sensor> sensorList) throws IOException {
//code
}
public synchronized ArrayList<Sensor> getSensorList() {
return new ArrayList<>(sensor);
}
public synchronized int size() {
return sensor.size();
}
public synchronized String returnRecentMeasurement (String id){
String recentMeasurement=null;
for (Sensor sensori : sensor) {
if (sensori.getIdentificator().equalsIgnoreCase(id))
recentMeasurement= InnerBuffer.getInstance().returnRecentMeasurements(id);
else
recentMeasurement = null;}
return recentMeasurement;
}
public synchronized void setPrevNextWhenAdd() throws IOException { //some other code where int position, prevPosition, nextPosition, prevPort, nextPort get their values.
}}
Класс клиента:
public class SensorClient {
public static void main(String[] args) throws Exception {
//Starting a new sensor
Sensor sensor = new Sensor(type,identificator,portnumber,ipnumber,gatewayAddr,timestamp);
Gson gson = new Gson();
String message = gson.toJson(sensor);
Client c = Client.create();
WebResource r = c.resource("http://localhost:9999/gateway/");
ClientResponse response = r.path("sensors/add").type(MediaType.APPLICATION_JSON).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, message);
if (response.getStatus() == 200) {
repeat = false;
Type collectionType = new TypeToken<ArrayList<Sensor>>(){}.getType();
ArrayList<Sensor> sensorList = gson.fromJson(response.getEntity(String.class), collectionType);
System.out.println("Starting the sensor ...");
System.out.println("Push exit when you want to delete the sensor!");
int position = 0;
for(int i = 0; i< sensorList.size();i++){
if(sensorList.get(i).getIdentificator().equalsIgnoreCase(sensor.getIdentificator()) )
position = i;
}
sensors.Sensor.simulation(type, identificator);// special thread for sensors simulations
createSensor.getInstance().setPrevNextWhenAdd(position,sensorList);
serverSocket serverSocket = new serverSocket(portnumber,sensorList,position,sensorList.get(position).getNext());
serverSocket.start();
StopSensor stopSensor = new StopSensor(identificator,portnumber,position,sensorList);
stopSensor.start();
oneSensor s = new oneSensor(portnumber,sensorList);
s.start();
} else {
repeat = true;
count +=1;
System.out.println("Error. Wrong data! ");
}
}
while (repeat );
}
}
}
ServerSocket поток
public class serverSocket extends Thread {
public int port,nextPort;
ArrayList<gateway.Sensor> sensorList;
public static int position;
public serverSocket(int port, ArrayList<gateway.Sensor> sensorList,int position,int nextPort) {
this.port = port;
this.nextPort=nextPort;
this.sensorList= sensorList;
this.position=position;}
public void run() {
ServerSocket welcomeSocket;
Socket connectionSocket;
try {
welcomeSocket = new ServerSocket(port);
while (true) {
connectionSocket = welcomeSocket.accept();
receivedMessages thread = new receivedMessages(connectionSocket,sensorList,position,nextPort);
thread.start();
}
} catch (IOException e) {
e.printStackTrace();
System.err.println("Error!!!!!!!!!");
}
}
}
Поток полученных сообщений
public class receivedMessages extends Thread {
private BufferedReader inFromClient;
private Socket connectionSocket;
ArrayList<gateway.Sensor> sensorList;
int position,nextPort;
public receivedMessages(Socket socket, ArrayList<gateway.Sensor> sensorList,int position,int nextPort){
connectionSocket = socket;
this.sensorList=sensorList;
this.position=position;
this.nextPort=nextPort;
try {
inFromClient = new BufferedReader( new InputStreamReader(connectionSocket.getInputStream()));
} catch (IOException e) { e.printStackTrace(); }
}
@Override
public void run() {
// while(true) {
try {
String message = (inFromClient.readLine().toString());
System.out.println(message);
if (message.startsWith("Next") || message.startsWith("Previous")) {
System.out.println(message);
} else if (message.startsWith("The")) {
System.out.println(message); createSensor.getInstance().setPrevNextWhenDelete(position, sensorList);
} else {// i receive the message that the list has changed
System.out.println(message);
sensorList = createSensor.getInstance().getSensorList();
System.out.println("Updated " + sensorList);}
1 ответ
Прислушиваться к изменениям
У ваших клиентских процессов есть 2 возможности быть в курсе изменений данных:
Ваши клиентские объекты могут регулярно проверять, был ли изменен список:
- запрашивая текущую версию списка
- или запросив временную метку изменения списка (класс вашего сервера,
createSensor
, тогда придется вести учет этого)
ваши клиентские объекты могут открывать порт для прослушивания уведомлений, а ваш серверный класс может уведомлять их об изменениях, выдвигая новый список, когда список изменяется.
Обмен обновлениями между потоком слушателя и рабочим потоком
В обоих случаях на стороне клиента у вас будет один поток, выполняющий работу, а другой поток ожидает изменений. Этот второй поток будет каждый раз восстанавливать новый список. Чтобы поделиться этим с рабочим потоком, вот что вы можете сделать:
- ваш поток слушателя будет хранить ссылку на
sensorList
что он был передан рабочим потоком при создании - при восстановлении нового списка ваш поток слушателя сохранит этот новый список в другой ссылке,
newList
- ваш список слушателей может затем изменить "старый" список так, чтобы он соответствовал новому списку, используя
oldList.clear()
затемoldList.addAll(newList)
,
Это будет работать, поскольку, хотя рабочий поток может иметь свою собственную ссылку на список, обе ссылки указывают на один и тот же объект, поэтому он будет видеть изменения.
Код для запуска потока слушателя
Например, если ваш рабочий поток является основным потоком, и он создает поток слушателя, ваш код может выглядеть так внутри кода рабочего потока:
class Listener implements Runnable {
List<Sensor> sensorList;
Listener(List<Sensor> list ){
sensorList = list;
}
public void run(){
// code to listen to changes on the server
// When server sends new information, thread can update the list directly:
// sensorList.clear();
// sensorList.addAll(newList);
}
}
Listener l = new Listener(sensorList);
Thread listenerThread = new Thread(l);
listenerThread.start();
Вы можете прочитать о потоках и о том, как они разделяют память, в Учебниках Java по параллелизму.
Будьте осторожны с синхронизацией: когда вы обновляете старый список, вы должны убедиться, что другой поток не выполняет итерации по нему. Вы, вероятно, должны использовать Collections.synchronizedList(sensorList)
чтобы убедиться, что нет никаких потоковых помех.