Python: автоматическое переподключение IP-камеры
Моя IP-камера кажется немного нестабильной и отключается случайным образом. Мне бы хотелось, чтобы мой сценарий мог определить, когда он отключен, и попытаться повторно подключиться несколько раз, вероятно, ожидая 5-10 секунд между попытками. Я попробовал несколько вещей, но ничего не работает.
Это мой основной скрипт, когда ret равен false, скрипт заканчивается:
#!/usr/local/bin/python3
import cv2
import time
import datetime
print("start time: " + datetime.datetime.now().strftime("%A %d %B %Y %I:%M:%S%p"))
cap = cv2.VideoCapture('rtsp://<ip><port>/live0.264')
while(True):
# Capture frame-by-frame
ret, frame = cap.read()
# Confirm we have a valid image returned
if not ret:
print("disconnected!")
break
# Our operations on the frame come here
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2BGRA)
# Display the resulting frame
cv2.imshow('frame', gray)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
print("end time: " + time.strftime("%X"))
# When everything is done, release the capture
cap.release()
cv2.destroyAllWindows()
Изменить: Я также хотел бы, чтобы скрипт попытался повторно подключиться к камере в случае, если моя сеть временно отключается или что-то в этом роде.
1 ответ
Я наконец смог решить это сам. Надеюсь, это полезно для всех, кто хочет сделать то же самое.
На самом деле это оболочка более сложного сценария, который имеет логику для обнаружения движения и записи видео при обнаружении движения. Все работает очень хорошо с этой базовой логикой (и моей дрянной IP-камерой), хотя я все еще делаю тестирование.
#!/usr/local/bin/python3
import cv2
import datetime
import time
def reset_attempts():
return 50
def process_video(attempts):
while(True):
(grabbed, frame) = camera.read()
if not grabbed:
print("disconnected!")
camera.release()
if attempts > 0:
time.sleep(5)
return True
else:
return False
recall = True
attempts = reset_attempts()
while(recall):
camera = cv2.VideoCapture("rtsp://<ip><port>/live0.264")
if camera.isOpened():
print("[INFO] Camera connected at " +
datetime.datetime.now().strftime("%m-%d-%Y %I:%M:%S%p"))
attempts = reset_attempts()
recall = process_video(attempts)
else:
print("Camera not opened " +
datetime.datetime.now().strftime("%m-%d-%Y %I:%M:%S%p"))
camera.release()
attempts -= 1
print("attempts: " + str(attempts))
# give the camera some time to recover
time.sleep(5)
continue
Более подробное описание:
https://github.com/Combinacijus/various-code-samples/tree/master/Python/OpenCV/ip_cam_reconnecting
Написал класс для работы с случайным отключением IP-камеры. Основная идея состоит в том, чтобы проверить, возвращает ли cap.read() кадр, и если нет, он пытается повторно подключиться к камере.
import cv2
import requests # NOTE: Only used for forceful reconnection
import time # NOTE: Only used for throttling down printing when connection is lost
class IPVideoCapture:
def __init__(self, cam_address, cam_force_address=None, blocking=False):
"""
:param cam_address: ip address of the camera feed
:param cam_force_address: ip address to disconnect other clients (forcefully take over)
:param blocking: if true read() and reconnect_camera() methods blocks until ip camera is reconnected
"""
self.cam_address = cam_address
self.cam_force_address = cam_force_address
self.blocking = blocking
self.capture = None
self.RECONNECTION_PERIOD = 0.5 # NOTE: Can be changed. Used to throttle down printing
self.reconnect_camera()
def reconnect_camera(self):
while True:
try:
if self.cam_force_address is not None:
requests.get(self.cam_force_address)
self.capture = cv2.VideoCapture(self.cam_address)
if not self.capture.isOpened():
raise Exception("Could not connect to a camera: {0}".format(self.cam_address))
print("Connected to a camera: {}".format(self.cam_address))
break
except Exception as e:
print(e)
if self.blocking is False:
break
time.sleep(self.RECONNECTION_PERIOD)
def read(self):
"""
Reads frame and if frame is not received tries to reconnect the camera
:return: ret - bool witch specifies if frame was read successfully
frame - opencv image from the camera
"""
ret, frame = self.capture.read()
if ret is False:
self.reconnect_camera()
return ret, frame
if __name__ == "__main__":
CAM_ADDRESS = "http://192.168.8.102:4747/video" # NOTE: Change
CAM_FORCE_ADDRESS = "http://192.168.8.102:4747/override" # NOTE: Change or omit
cap = IPVideoCapture(CAM_ADDRESS, CAM_FORCE_ADDRESS, blocking=True)
# cap = IPVideoCapture(CAM_ADDRESS) # Minimal init example
while True:
ret, frame = cap.read()
if ret is True:
cv2.imshow(CAM_ADDRESS, frame)
if cv2.waitKey(1) == ord("q"):
break