Передача данных из другого потока, медленная последовательная связь с библиотекой Java и RXTX

Хорошо, я постараюсь быть максимально ясным с моей проблемой. Я передаю последовательные данные по очень медленной радиосвязи (используя UART-контроллер на Raspberry Pi и домашнее радио). Это для очень специфического проекта, где требование прописано на большом расстоянии, а скорость менее важна. Программа (Radio.java) работает в двух потоках. Один поток (Receiver) получает данные телеметрии от другой программы, используя TCP-сокет (который очень высок, фактически 100 Мбит). Этот поток непрерывно сохраняет данные, которые он получает на TCP-сокете, в ArrayBlockingQueue (с размером = 1), чтобы другой поток (Transmitter) мог получить эти данные. Скорость, с которой поток приемника получает данные, довольно высока. Теперь я хочу, чтобы поток-передатчик передавал данные, а когда он закончится, я хочу, чтобы он снова получал последние данные из потока-приемника и снова передавал их по медленной радиосвязи. Так что в потоке передатчика я хочу, чтобы это работало так:

  1. Получить последние данные из Receiver-thread

  2. Передача данных по радиоканалу (с использованием последовательного порта)

  3. Не делайте НИЧЕГО, пока данные фактически не передаются.

  4. повторение.

Теперь, когда я запускаю программу, все, что касается потока Receiver, работает просто отлично. Но внутри потока передатчика строка "this.out.write(output.getBytes());" просто помещает все в OutputStream за пару миллисекунд, а затем снова делает то же самое. Данные не могут быть переданы!

Я попробовал этот пример (только используя "SerialWriter") здесь: http://rxtx.qbang.org/wiki/index.php/Two_way_communcation_with_the_serial_port

И используя длинный текст "Lirum Ipsum", все работало просто отлично, передавая со скоростью 50 бод. В общем, я хочу, чтобы в моей программе было то же поведение, что и при использовании System.in.read> -1... (что, по-моему, является блокировкой, причина, по которой это работает???).

Что я должен делать?

2015-01-01 изменить НАЧАТЬ

Я нашел проблему! SRobertz ведет меня в правильном направлении! Проблема на самом деле не в скорости записи в UART-буфер. Разница между запуском примера "TwoWayComm" и моим собственным кодом заключается в том, что я использую GPS, подключенный к UART-RX-порту Raspberry Pi. Для считывания данных с GPS используется "GPSD"-программа (которая выводит данные в формате JSON). Программное обеспечение GPSD подключается к GPS с 9600 бод (специально для этого GPS-устройства), а я переключаюсь на 50 бод на том же порту (не закрывая открытое соединение, в котором работает GPSD)! Попытка открыть UART с двумя разными скоростями передачи - вот что все портит. Я переписал код, чтобы я:

  1. Откройте UART на 9600 бод
  2. Читать данные GPS
  3. Закройте UART
  4. Откройте UART на 50 бод
  5. Передать данные телеметрии в UART
  6. Закрыть UART
  7. Повторение

И теперь все работает как шарм...

2015-01-01 edit END

Итак... вот код:

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;

import gnu.io.CommPort;
import gnu.io.CommPortIdentifier;
import gnu.io.SerialPort;

public class RADIO {
    ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);

    void connect(String portName) throws Exception {

        CommPortIdentifier portIdentifier = CommPortIdentifier
                .getPortIdentifier(portName);
        if (portIdentifier.isCurrentlyOwned()) {
            System.out.println("Error: Port is currently in use");
        } else {
            int timeout = 2000;
            CommPort commPort = portIdentifier.open(this.getClass().getName(),
                    timeout);

            if (commPort instanceof SerialPort) {
                SerialPort serialPort = (SerialPort) commPort;
                serialPort.setSerialPortParams(50, SerialPort.DATABITS_7,
                        SerialPort.STOPBITS_2, SerialPort.PARITY_NONE);

                // Open outputstream to write to the serial port
                OutputStream out = serialPort.getOutputStream();

                (new Thread(new Receiver(queue))).start();
                (new Thread(new Transmitter(out, queue))).start();

            } else {
                System.err.println("Error: Not serial port.");
            }
        }
    }

    public static class Receiver implements Runnable {
        OutputStream out;
        protected ArrayBlockingQueue<String> queue = null;

        public Receiver(ArrayBlockingQueue<String> queue) {
            this.queue = queue;
        }

        public void run() {
            // Open TCP-connection
            try {
                ServerSocket serverSocket = new ServerSocket(1002);

                Socket clientSocket = serverSocket.accept(); // Wait for the client to start up
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        clientSocket.getInputStream()));
                String inputLine, outputLine;

                while ((inputLine = in.readLine()) != null) {
                    queue.clear();
                    queue.put(inputLine);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }
    }

    public static class Transmitter implements Runnable {
        OutputStream out;
        protected ArrayBlockingQueue<String> queue = null;
        String output = "";

        public Transmitter(OutputStream out, ArrayBlockingQueue<String> queue) {
            this.out = out;
            this.queue = queue;
        }

        public void run() {
            try {
                while (true) {
                    output = queue.take();
                    this.out.write(output.getBytes());
                }

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

    }

    public static void main(String[] args) {
        try {
            (new RADIO()).connect("/dev/ttyAMA0");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

1 ответ

Решение

(небольшое предупреждение: я не использовал gnu.io на малиновом пи)

Во-первых, чтобы выделить проблему, я бы поставил довольно долго sleep в потоке передатчика, после this.out.write... чтобы убедиться, что проблема не ожидает завершения передачи последовательным портом.

Если это работает, то вы можете попробовать ждать OUTPUT_BUFFER_EMPTY, добавив SerialPortEventListener и настройка notifyOnOutputEmpty(true), делая вашSerialPortEventListener монитор вдоль линий

class ExampleMonitor implements SerialPortEventListener {
  boolean condition;

  public synchronized serialEvent(SerialPortEvent ev) {
    condition = true;
    notifyAll();
  }

  public synchronized void awaitCondition() throws InterruptedException {
    while(!condition) wait();
    condition = false;
  }

а затем сделать

myExampleMonitor.awaitCondition() вместо sleep в потоке передачи.

См. http://rxtx.qbang.org/wiki/index.php/Event_based_two_way_Communication для обратного использования событий (обратите внимание, что там нет монитора и нет ожидания; вместо этого работа выполняется в слушателе / ​​обратном вызове.)

Другие вопросы по тегам