Python: автоматическое переподключение IP-камеры

Моя IP-камера кажется немного нестабильной и отключается случайным образом. Мне бы хотелось, чтобы мой сценарий мог определить, когда он отключен, и попытаться повторно подключиться несколько раз, вероятно, ожидая 5-10 секунд между попытками. Я попробовал несколько вещей, но ничего не работает.

Это мой основной скрипт, когда ret равен false, скрипт заканчивается:

#!/usr/local/bin/python3

import cv2
import time
import datetime

print("start time: " + datetime.datetime.now().strftime("%A %d %B %Y %I:%M:%S%p"))

cap = cv2.VideoCapture('rtsp://<ip><port>/live0.264')

while(True):
    # Capture frame-by-frame
    ret, frame = cap.read()

    # Confirm we have a valid image returned
    if not ret:
        print("disconnected!")
        break

    # Our operations on the frame come here
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2BGRA)

    # Display the resulting frame
    cv2.imshow('frame', gray)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

print("end time: " + time.strftime("%X"))
# When everything is done, release the capture
cap.release()
cv2.destroyAllWindows()

Изменить: Я также хотел бы, чтобы скрипт попытался повторно подключиться к камере в случае, если моя сеть временно отключается или что-то в этом роде.

1 ответ

Решение

Я наконец смог решить это сам. Надеюсь, это полезно для всех, кто хочет сделать то же самое.

На самом деле это оболочка более сложного сценария, который имеет логику для обнаружения движения и записи видео при обнаружении движения. Все работает очень хорошо с этой базовой логикой (и моей дрянной IP-камерой), хотя я все еще делаю тестирование.

#!/usr/local/bin/python3

import cv2
import datetime
import time


def reset_attempts():
    return 50


def process_video(attempts):

    while(True):
        (grabbed, frame) = camera.read()

        if not grabbed:
            print("disconnected!")
            camera.release()

            if attempts > 0:
                time.sleep(5)
                return True
            else:
                return False


recall = True
attempts = reset_attempts()

while(recall):
    camera = cv2.VideoCapture("rtsp://<ip><port>/live0.264")

    if camera.isOpened():
        print("[INFO] Camera connected at " +
              datetime.datetime.now().strftime("%m-%d-%Y %I:%M:%S%p"))
        attempts = reset_attempts()
        recall = process_video(attempts)
    else:
        print("Camera not opened " +
              datetime.datetime.now().strftime("%m-%d-%Y %I:%M:%S%p"))
        camera.release()
        attempts -= 1
        print("attempts: " + str(attempts))

        # give the camera some time to recover
        time.sleep(5)
        continue

Более подробное описание:

https://github.com/Combinacijus/various-code-samples/tree/master/Python/OpenCV/ip_cam_reconnecting

Написал класс для работы с случайным отключением IP-камеры. Основная идея состоит в том, чтобы проверить, возвращает ли cap.read() кадр, и если нет, он пытается повторно подключиться к камере.

import cv2
import requests  # NOTE: Only used for forceful reconnection
import time  # NOTE: Only used for throttling down printing when connection is lost


class IPVideoCapture:
    def __init__(self, cam_address, cam_force_address=None, blocking=False):
        """
        :param cam_address: ip address of the camera feed
        :param cam_force_address: ip address to disconnect other clients (forcefully take over)
        :param blocking: if true read() and reconnect_camera() methods blocks until ip camera is reconnected
        """

        self.cam_address = cam_address
        self.cam_force_address = cam_force_address
        self.blocking = blocking
        self.capture = None

        self.RECONNECTION_PERIOD = 0.5  # NOTE: Can be changed. Used to throttle down printing

        self.reconnect_camera()

    def reconnect_camera(self):
        while True:
            try:
                if self.cam_force_address is not None:
                    requests.get(self.cam_force_address)

                self.capture = cv2.VideoCapture(self.cam_address)

                if not self.capture.isOpened():
                    raise Exception("Could not connect to a camera: {0}".format(self.cam_address))

                print("Connected to a camera: {}".format(self.cam_address))

                break
            except Exception as e:
                print(e)

                if self.blocking is False:
                    break

                time.sleep(self.RECONNECTION_PERIOD)

    def read(self):
        """
        Reads frame and if frame is not received tries to reconnect the camera

        :return: ret - bool witch specifies if frame was read successfully
                 frame - opencv image from the camera
        """

        ret, frame = self.capture.read()

        if ret is False:
            self.reconnect_camera()

        return ret, frame


if __name__ == "__main__":
    CAM_ADDRESS = "http://192.168.8.102:4747/video"  # NOTE: Change
    CAM_FORCE_ADDRESS = "http://192.168.8.102:4747/override"  # NOTE: Change or omit
    cap = IPVideoCapture(CAM_ADDRESS, CAM_FORCE_ADDRESS, blocking=True)
    # cap = IPVideoCapture(CAM_ADDRESS)  # Minimal init example

    while True:
        ret, frame = cap.read()

        if ret is True:
            cv2.imshow(CAM_ADDRESS, frame)

        if cv2.waitKey(1) == ord("q"):
            break
Другие вопросы по тегам