Получение и отправка сообщений mavlink с использованием библиотеки pymavlink

Я создал прокси между QGC(наземной станцией управления) и транспортным средством на Python. Вот код:

gcs_conn = mavutil.mavlink_connection('tcpin:localhost:15795')
gcs_conn.wait_heartbeat()
print("Heartbeat from system (system %u component %u)" %(gcs_conn.target_system, gcs_conn.target_system))
vehicle = mavutil.mavlink_connection('tcp:localhost:5760')
vehicle.wait_heartbeat() # recieving heartbeat from the vehicle
print("Heartbeat from system (system %u component %u)" %(vehicle.target_system, vehicle.target_system))
while True:
     gcs_msg = gcs_conn.recv_match()
     if gcs_msg == None:
         pass
     else:
         vehicle.mav.send(gcs_msg)
         print(gcs_msg)

     vcl_msg = vehicle.recv_match()
     if vcl_msg == None:
         pass
     else:
         gcs_conn.mav.send(vcl_msg)
         print(vcl_msg)

Мне нужно получить сообщения от QGC, а затем переслать их в автомобиль, а также получить сообщения от автомобиля и переслать их в QGC. Когда я запускаю код, я получаю эту ошибку.

Есть ли кто-нибудь, кто может мне помочь?

2 ответа

Для успешной пересылки MAVLink должно произойти несколько вещей. Я предполагаю, что вам нужно удобное соединение с GCS, например QGroundControl или MissionPlanner. Я использую QGC, и мой дизайн проходит базовое тестирование.

Обратите внимание, что это написано на Python3. Этот фрагмент не протестирован, но у меня есть (гораздо более сложная) версия, протестированная и работающая.

      
from pymavlink import mavutil
import time


# PyMAVLink has an issue that received messages which contain strings
# cannot be resent, because they become Python strings (not bytestrings)
# This converts those messages so your code doesn't crash when
# you try to send the message again.
def fixMAVLinkMessageForForward(msg):
    msg_type = msg.get_type()
    if msg_type in ('PARAM_VALUE', 'PARAM_REQUEST_READ', 'PARAM_SET'):
        if type(msg.param_id) == str:
            msg.param_id = msg.param_id.encode()
    elif msg_type == 'STATUSTEXT':
        if type(msg.text) == str:
            msg.text = msg.text.encode()
    return msg


# Modified from the snippet in your question
# UDP will work just as well or better
gcs_conn = mavutil.mavlink_connection('tcp:localhost:15795', input=False)
gcs_conn.wait_heartbeat()
print(f'Heartbeat from system (system {gcs_conn.target_system} component {gcs_conn.target_system})')

vehicle = mavutil.mavlink_connection('tcp:localhost:5760')
vehicle.wait_heartbeat()
print(f'Heartbeat from system (system {vehicle.target_system} component {vehicle.target_system})')

while True:
    # Don't block for a GCS message - we have messages
    # from the vehicle to get too
    gcs_msg = gcs_conn.recv_match(blocking=False)
    if gcs_msg is None:
        pass
    elif gcs_msg.get_type() != 'BAD_DATA':
        # We now have a message we want to forward. Now we need to
        # make it safe to send
        gcs_msg = fixMAVLinkMessageForForward(gcs_msg)

        # Finally, in order to forward this, we actually need to
        # hack PyMAVLink so the message has the right source
        # information attached.
        vehicle.mav.srcSystem = gcs_msg.get_srcSystem()
        vehicle.mav.srcComponent = gcs_msg.get_srcComponent()

        # Only now is it safe to send the message
        vehicle.mav.send(gcs_msg)
        print(gcs_msg)

    vcl_msg = vehicle.recv_match(blocking=False)
    if vcl_msg is None:
        pass
    elif vcl_msg.get_type() != 'BAD_DATA':
        # We now have a message we want to forward. Now we need to
        # make it safe to send
        vcl_msg = fixMAVLinkMessageForForward(vcl_msg)

        # Finally, in order to forward this, we actually need to
        # hack PyMAVLink so the message has the right source
        # information attached.
        gcs_conn.mav.srcSystem = vcl_msg.get_srcSystem()
        gcs_conn.mav.srcComponent = vcl_msg.get_srcComponent()

        gcs_conn.mav.send(vcl_msg)
        print(vcl_msg)

    # Don't abuse the CPU by running the loop at maximum speed
    time.sleep(0.001)

Примечания

Убедитесь, что ваш цикл не заблокирован

Цикл должен быстро проверять, доступно ли сообщение из того или иного соединения, вместо того, чтобы ждать, пока сообщение станет доступным из одного соединения. В противном случае сообщение о другом соединении не будет передано, пока блокирующее соединение не получит сообщение.

Проверить действительность сообщения

Убедитесь, что вы действительно получили правильное сообщение, а не сообщение BAD_DATA. Попытка отправить BAD_DATA приведет к сбою

Убедитесь, что получатель получает правильную информацию об отправителе.

По умолчанию PyMAVLink при отправке сообщения будет кодировать идентификаторы ВАШЕЙ системы и компонентов (обычно оставленные равными нулю) вместо идентификаторов из сообщения. GCS, получающий это, может быть сбит с толку (например, QGC) и неправильно подключиться к транспортному средству (несмотря на отображение сообщений в инспекторе MAVLink).

Это исправлено путем взлома PyMAVLink таким образом, чтобы идентификаторы вашей системы и компонентов совпадали с перенаправленным сообщением. При необходимости это можно сделать после отправки сообщения. Посмотрите пример, чтобы увидеть, как я это сделал.

Если вы распечатаете свое сообщение перед отправкой, вы заметите, что оно всегда терпит неудачу при попытке отправить BAD_DATA тип сообщения.

Так что это должно исправить (то же самое для vcl_msg):

if gcs_msg and gcs_msg.get_type() != 'BAD_DATA':
    vehicle.mav.send(gcs_msg)

PD: я заметил, что вы не указываете tcp в качестве входа или выхода по умолчанию input, Это означает, что оба соединения являются входами. Я рекомендую установить соединение GCS как выход:

gcs_conn = mavutil.mavlink_connection('tcp:localhost:15795', input=False)

https://mavlink.io/en/mavgen_python/

Другие вопросы по тегам