Передача данных из другого потока, медленная последовательная связь с библиотекой Java и RXTX
Хорошо, я постараюсь быть максимально ясным с моей проблемой. Я передаю последовательные данные по очень медленной радиосвязи (используя UART-контроллер на Raspberry Pi и домашнее радио). Это для очень специфического проекта, где требование прописано на большом расстоянии, а скорость менее важна. Программа (Radio.java) работает в двух потоках. Один поток (Receiver) получает данные телеметрии от другой программы, используя TCP-сокет (который очень высок, фактически 100 Мбит). Этот поток непрерывно сохраняет данные, которые он получает на TCP-сокете, в ArrayBlockingQueue (с размером = 1), чтобы другой поток (Transmitter) мог получить эти данные. Скорость, с которой поток приемника получает данные, довольно высока. Теперь я хочу, чтобы поток-передатчик передавал данные, а когда он закончится, я хочу, чтобы он снова получал последние данные из потока-приемника и снова передавал их по медленной радиосвязи. Так что в потоке передатчика я хочу, чтобы это работало так:
Получить последние данные из Receiver-thread
Передача данных по радиоканалу (с использованием последовательного порта)
Не делайте НИЧЕГО, пока данные фактически не передаются.
повторение.
Теперь, когда я запускаю программу, все, что касается потока Receiver, работает просто отлично. Но внутри потока передатчика строка "this.out.write(output.getBytes());" просто помещает все в OutputStream за пару миллисекунд, а затем снова делает то же самое. Данные не могут быть переданы!
Я попробовал этот пример (только используя "SerialWriter") здесь: http://rxtx.qbang.org/wiki/index.php/Two_way_communcation_with_the_serial_port
И используя длинный текст "Lirum Ipsum", все работало просто отлично, передавая со скоростью 50 бод. В общем, я хочу, чтобы в моей программе было то же поведение, что и при использовании System.in.read> -1... (что, по-моему, является блокировкой, причина, по которой это работает???).
Что я должен делать?
2015-01-01 изменить НАЧАТЬ
Я нашел проблему! SRobertz ведет меня в правильном направлении! Проблема на самом деле не в скорости записи в UART-буфер. Разница между запуском примера "TwoWayComm" и моим собственным кодом заключается в том, что я использую GPS, подключенный к UART-RX-порту Raspberry Pi. Для считывания данных с GPS используется "GPSD"-программа (которая выводит данные в формате JSON). Программное обеспечение GPSD подключается к GPS с 9600 бод (специально для этого GPS-устройства), а я переключаюсь на 50 бод на том же порту (не закрывая открытое соединение, в котором работает GPSD)! Попытка открыть UART с двумя разными скоростями передачи - вот что все портит. Я переписал код, чтобы я:
- Откройте UART на 9600 бод
- Читать данные GPS
- Закройте UART
- Откройте UART на 50 бод
- Передать данные телеметрии в UART
- Закрыть UART
- Повторение
И теперь все работает как шарм...
2015-01-01 edit END
Итак... вот код:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import gnu.io.CommPort;
import gnu.io.CommPortIdentifier;
import gnu.io.SerialPort;
public class RADIO {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);
void connect(String portName) throws Exception {
CommPortIdentifier portIdentifier = CommPortIdentifier
.getPortIdentifier(portName);
if (portIdentifier.isCurrentlyOwned()) {
System.out.println("Error: Port is currently in use");
} else {
int timeout = 2000;
CommPort commPort = portIdentifier.open(this.getClass().getName(),
timeout);
if (commPort instanceof SerialPort) {
SerialPort serialPort = (SerialPort) commPort;
serialPort.setSerialPortParams(50, SerialPort.DATABITS_7,
SerialPort.STOPBITS_2, SerialPort.PARITY_NONE);
// Open outputstream to write to the serial port
OutputStream out = serialPort.getOutputStream();
(new Thread(new Receiver(queue))).start();
(new Thread(new Transmitter(out, queue))).start();
} else {
System.err.println("Error: Not serial port.");
}
}
}
public static class Receiver implements Runnable {
OutputStream out;
protected ArrayBlockingQueue<String> queue = null;
public Receiver(ArrayBlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
// Open TCP-connection
try {
ServerSocket serverSocket = new ServerSocket(1002);
Socket clientSocket = serverSocket.accept(); // Wait for the client to start up
BufferedReader in = new BufferedReader(new InputStreamReader(
clientSocket.getInputStream()));
String inputLine, outputLine;
while ((inputLine = in.readLine()) != null) {
queue.clear();
queue.put(inputLine);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class Transmitter implements Runnable {
OutputStream out;
protected ArrayBlockingQueue<String> queue = null;
String output = "";
public Transmitter(OutputStream out, ArrayBlockingQueue<String> queue) {
this.out = out;
this.queue = queue;
}
public void run() {
try {
while (true) {
output = queue.take();
this.out.write(output.getBytes());
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) {
try {
(new RADIO()).connect("/dev/ttyAMA0");
} catch (Exception e) {
e.printStackTrace();
}
}
}
1 ответ
(небольшое предупреждение: я не использовал gnu.io на малиновом пи)
Во-первых, чтобы выделить проблему, я бы поставил довольно долго sleep
в потоке передатчика, после this.out.write...
чтобы убедиться, что проблема не ожидает завершения передачи последовательным портом.
Если это работает, то вы можете попробовать ждать OUTPUT_BUFFER_EMPTY
, добавив SerialPortEventListener
и настройка notifyOnOutputEmpty(true)
, делая вашSerialPortEventListener
монитор вдоль линий
class ExampleMonitor implements SerialPortEventListener {
boolean condition;
public synchronized serialEvent(SerialPortEvent ev) {
condition = true;
notifyAll();
}
public synchronized void awaitCondition() throws InterruptedException {
while(!condition) wait();
condition = false;
}
а затем сделать
myExampleMonitor.awaitCondition()
вместо sleep
в потоке передачи.
См. http://rxtx.qbang.org/wiki/index.php/Event_based_two_way_Communication для обратного использования событий (обратите внимание, что там нет монитора и нет ожидания; вместо этого работа выполняется в слушателе / обратном вызове.)