LR1 RTSP Server Script
import gi
import subprocess
import time
import sys
import os
import hashlib
STREAM_RES_WIDTH="1280"
STREAM_RES_HEIGHT="720"
STREAM_RES_FPS="20/1"
CAMERA_DEVICE="/dev/v4l/by-id/usb-MACROSILICON_USB_Video-video-index0"
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GstRtspServer, GLib
Gst.init(None)
done_file = '/home/ift/done.txt'
update_file = '/home/ift/rtsp_update.tar.xz'
checksum_file = '/home/ift/checksum.txt'
def set_camera_resolution():
try:
print("[DEBUG] Setting camera resolution to " + STREAM_RES_WIDTH + "x" + STREAM_RES_HEIGHT + " with MJPG format.")
subprocess.run(['v4l2-ctl', '-d', CAMERA_DEVICE, '--set-fmt-video=width=' + STREAM_RES_WIDTH + ',height=' + STREAM_RES_HEIGHT + ',pixelformat=MJPG'], check=True)
print("[DEBUG] Camera resolution set successfully.")
time.sleep(1)
# Check and print the current resolution
result = subprocess.run(['v4l2-ctl', '-d', CAMERA_DEVICE, '--get-fmt-video'], capture_output=True, text=True)
print("[DEBUG] Current video format:")
print(result.stdout)
except subprocess.CalledProcessError as e:
print(f"[ERROR] Failed to set camera resolution: {e}")
sys.exit(1)
class MyFactory(GstRtspServer.RTSPMediaFactory):
def __init__(self, **properties):
super(MyFactory, self).__init__(**properties)
self.launch_string = ('v4l2src device=' + CAMERA_DEVICE + ' ! '
'image/jpeg,width=' + STREAM_RES_WIDTH + ',height=' + STREAM_RES_HEIGHT + ',framerate=' + STREAM_RES_FPS + ' ! '
'jpegdec ! videoconvert ! '
'video/x-raw,format=I420,width=' + STREAM_RES_WIDTH + ',height=' + STREAM_RES_HEIGHT + ',framerate=' + STREAM_RES_FPS + ' ! '
'queue max-size-buffers=1 leaky=downstream ! '
'x264enc speed-preset=ultrafast tune=zerolatency bitrate=2000 ! '
'video/x-h264,stream-format=byte-stream ! '
'h264parse ! rtph264pay name=pay0 pt=96 config-interval=1')
print(f"[DEBUG] GStreamer pipeline launch string: {self.launch_string}")
def do_create_element(self, url):
print(f"[DEBUG] Creating pipeline element for URL: {url}")
try:
pipeline = Gst.parse_launch(self.launch_string)
print("[DEBUG] Pipeline created successfully.")
return pipeline
except Exception as e:
print(f"[ERROR] Failed to create pipeline: {e}")
return None
def calculate_checksum(file_path, algorithm='sha256'):
hash_func = getattr(hashlib, algorithm)()
with open(file_path, 'rb') as f:
while chunk := f.read(8192):
hash_func.update(chunk)
return hash_func.hexdigest()
def isUpdateNeeded():
# if rtsp_update.tar.xz exists, check the checksum and compare with /home/ift/checksum.txt (if it exists)
if not os.path.exists(update_file):
print("[DEBUG] No update file found.")
return False
if not os.path.exists(checksum_file):
print("[DEBUG] No checksum file found. Creating one.")
checksum = calculate_checksum(update_file)
with open(checksum_file, 'w') as f:
f.write(checksum)
return True
if os.path.exists(done_file):
# Read contents of done_file, check if it contains 0
with open(done_file, 'r') as f:
done_contents = f.read().strip()
if done_contents == "0":
print("[DEBUG] Update previously failed (done.txt contains 0). Will attempt update again.")
return True
with open(checksum_file, 'r') as f:
old_checksum = f.read().strip()
new_checksum = calculate_checksum(update_file)
if old_checksum != new_checksum:
print("[DEBUG] Update needed. Checksum has changed.")
with open(checksum_file, 'w') as f:
f.write(new_checksum)
return True
print("Old checksum: ", old_checksum)
print("New checksum: ", new_checksum)
return False
def extract_update_file():
if not os.path.exists(update_file):
print("[DEBUG] No update file found.")
return False
try:
print("[DEBUG] Extracting update file...")
subprocess.run(['tar', '-xvf', update_file, '-C', '/home/ift/'], check=True)
print("[DEBUG] Update extracted successfully.")
return True
except subprocess.CalledProcessError as e:
print(f"[ERROR] Failed to extract update file: {e}")
def main():
print("Checking if update is necessary...")
update_needed = isUpdateNeeded()
if update_needed:
success = extract_update_file()
if success:
if os.path.exists('/home/ift/install/install.sh'):
# run install.sh in the extracted folder
print("[DEBUG] Running install script...")
try:
subprocess.run(['bash', '/home/ift/install/install.sh'], check=True)
except subprocess.CalledProcessError as e:
print(f"[ERROR] Failed to run install script: {e}")
else:
print("[DEBUG] No install script found in the update.")
set_camera_resolution() # Set the resolution before starting the server
try:
print("[DEBUG] Initializing RTSP server.")
server = GstRtspServer.RTSPServer()
factory = MyFactory()
factory.set_shared(True)
print("[DEBUG] Adding media factory to mount points.")
server.get_mount_points().add_factory("/stream", factory)
print("[DEBUG] Attaching RTSP server to main context.")
server.attach(None)
print("[DEBUG] Starting GLib MainLoop.")
loop = GLib.MainLoop()
loop.run()
except Exception as e:
print(f"[ERROR] Exception in main loop: {e}")
if __name__ == "__main__":
main()
Last updated