Один поток останавливается слишком рано независимо от CyclicBarrier
Я осознаю тот факт, что следующий код может показаться вульгарным, но я новичок в этих вещах и просто перепробовал все, чтобы заставить его работать...
Проблема: Даже если я использую (возможно, неправильно) CyclicBarrier, один - и кажется, всегда один и тот же - поток останавливается слишком рано и печатает свой вектор, оставляя 1 из 11 сообщений "Входящее соединение" отсутствующим, Возможно, что-то ужасно не так с последней итерацией моего цикла, но я не могу найти, что именно.. Теперь программа просто зацикливается, ожидая обработки последнего соединения.
public class VectorClockClient implements Runnable {
/*
* Attributes
*/
/*
* The client number is store to provide fast
* array access when, for example, a thread's own
* clock simply needs to be incremented.
*/
private int clientNumber;
private File configFile, inputFile;
int[] vectorClock;
/*
* Constructor
* @param
* - File config
* - int line
* - File input
* - int clients
*/
public VectorClockClient(File config, int line, File input, int clients) {
/*
* Make sure that File handles aren't null and that
* the line number is valid.
*/
if (config != null && line >= 0 && input != null) {
configFile = config;
inputFile = input;
clientNumber = line;
/*
* Set the array size to the number of lines found in the
* config file and initialize with zero values.
*/
vectorClock = new int[clients];
for (int i = 0; i < vectorClock.length; i++) {
vectorClock[i] = 0;
}
}
}
private int parsePort() {
int returnable = 0;
try {
FileInputStream fstream = new FileInputStream(configFile.getName());
DataInputStream in = new DataInputStream(fstream);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String strLine = "";
for (int i = 0; i < clientNumber + 1; i++) {
strLine = br.readLine();
}
String[] tokens = strLine.split(" ");
returnable = Integer.parseInt(tokens[1]);
}
catch (Exception e) {
e.printStackTrace();
}
System.out.println("[" + clientNumber + "] returned with " + returnable + ".");
return returnable;
}
private int parsePort(int client) {
int returnable = 0;
try {
FileInputStream fstream = new FileInputStream(configFile.getName());
DataInputStream in = new DataInputStream(fstream);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String strLine = "";
for (int i = 0; i < client; i++) {
strLine = br.readLine();
}
String[] tokens = strLine.split(" ");
returnable = Integer.parseInt(tokens[1]);
}
catch (Exception e) {
e.printStackTrace();
}
return returnable;
}
private int parseAction(String s) {
int returnable = -1;
try {
FileInputStream fstream = new FileInputStream(configFile.getName());
DataInputStream in = new DataInputStream(fstream);
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String[] tokens = s.split(" ");
if (!(Integer.parseInt(tokens[0]) == this.clientNumber + 1)) {
return -1;
}
else {
if (tokens[1].equals("L")) {
vectorClock[clientNumber] += Integer.parseInt(tokens[2]);
}
else {
returnable = Integer.parseInt(tokens[2]);
}
}
}
catch (Exception e) {
e.printStackTrace();
}
return returnable;
}
/*
* Do the actual work.
*/
public void run() {
try {
InitClients.barrier.await();
}
catch (Exception e) {
System.out.println(e);
}
int port = parsePort();
String hostname = "localhost";
String strLine;
ServerSocketChannel ssc;
SocketChannel sc;
FileInputStream fstream;
DataInputStream in;
BufferedReader br;
boolean eof = false;
try {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(hostname, port));
ssc.configureBlocking(false);
fstream = new FileInputStream("input_vector.txt");
in = new DataInputStream(fstream);
br = new BufferedReader(new InputStreamReader(in));
try {
InitClients.barrier.await();
}
catch (Exception e) {
System.out.println(e);
}
while (true && (eof == false)) {
sc = ssc.accept();
if (sc == null) {
if ((strLine = br.readLine()) != null) {
int result = parseAction(strLine);
if (result >= 0) {
//System.out.println("[" + (clientNumber + 1)
//+ "] Send a message to " + result + ".");
try {
SocketChannel client = SocketChannel.open();
client.configureBlocking(true);
client.connect(
new InetSocketAddress("localhost",
parsePort(result)));
//ByteBuffer buf = ByteBuffer.allocateDirect(32);
//buf.put((byte)0xFF);
//buf.flip();
//vectorClock[clientNumber] += 1;
//int numBytesWritten = client.write(buf);
String obj = Integer.toString(clientNumber+1);
ObjectOutputStream oos = new
ObjectOutputStream(
client.socket().getOutputStream());
oos.writeObject(obj);
oos.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
else {
eof = true;
}
}
else {
ObjectInputStream ois = new
ObjectInputStream(sc.socket().getInputStream());
String clientNumberString = (String)ois.readObject();
System.out.println("At {Client[" + (clientNumber + 1)
+ "]}Incoming connection from: "
+ sc.socket().getRemoteSocketAddress()
+ " from {Client[" + clientNumberString + "]}");
sc.close();
}
try {
InitClients.barrier.await();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
catch (Exception e) {
e.printStackTrace();
}
printVector();
}
private void printVector() {
System.out.print("{Client[" + (clientNumber + 1) + "]}{");
for (int i = 0; i < vectorClock.length; i++) {
System.out.print(vectorClock[i] + "\t");
}
System.out.println("}");
}
}
Для пояснения, вот форматы используемых файлов. Config содержит имена хостов и порты, используемые клиентами, которые являются потоками, и строки входного файла означают либо "этот клиент отправляет сообщение этому клиенту", либо "этот клиент увеличивает свои логические часы на некоторое постоянное значение".
1 M 2 (M означает отправку сообщения)
2 м 3
3 М 4
2 L 7 (L означает увеличение тактовой частоты)
2 М 1
...
127.0.0.1 9000
127.0.0.1 9001
127.0.0.1 9002
127.0.0.1 9003
...
1 ответ
Я хотел бы взглянуть на логику, связанную с тем, когда вы ожидаете входящего соединения сокета. От вашего вопроса похоже, что вы ожидаете определенного количества входящих соединений сокетов (возможно, входящее соединение после каждого исходящего сообщения?). Поскольку вы используете неблокирующий ввод / вывод на входящем сокете, всегда возможно, что ваш оператор while будет зациклен до того, как будет установлен входящий сокет. В результате поток сможет продолжить и читать следующую строку из файла без получения соединения. Поскольку ваше конечное состояние достигается после достижения конца файла, возможно, вы можете пропустить входящее сокетное соединение.
Я бы добавил несколько простых распечаток, которые отображаются при чтении из файла, при отправке сообщения и при получении входящего соединения. Это должно быстро сообщить вам, отсутствует ли в определенном потоке ожидаемое входящее соединение. Если выясняется, что проблема связана с неблокирующим вводом / выводом, то вам может потребоваться отключить неблокирующий ввод / вывод, когда вы ожидаете входящий сокет, или реализовать элемент управления, который отслеживает ожидаемое количество входящих сокетов. и продолжается до тех пор, пока эта цель не будет достигнута.
Надеюсь это поможет.