【相机+处理】多线程读取实时帧进行图像处理

多线程编写相机与后期的图像处理 目的是读取实时帧进行处理 分为两个线程 子线程进行图像的刷新 主进程抓取图像进行处理

目录:

【相机+处理】多线程读取实时帧进行图像处理VideoCapture版--rtsp流--+锁--继承后重写run方法VideoCapture版--RTSP流不加锁版(因为没有锁,所以会有问题--教训版--(继承线程并重写run方法)直接拉流版 --最直接的 CV 拉流版直接拉流存图版--适用于一切USB相机直接拉流多线程存图版VideoCapture版--无锁--没继承--初始化对象即启动线程realsense版--无锁--没继承--初始化对象即启动线程realsense版--有锁--没继承--初始化对象即启动线程

VideoCapture版–rtsp流–+锁–继承后重写run方法

import threading

import time

import cv2

# 创建一个线程类

class ImageThread(threading.Thread):

def __init__(self):

super(ImageThread, self).__init__()

self.latest_image = None # 存储最新的图像

self.lock = threading.Lock() # 创建一个锁对象

def run(self):

cap = cv2.VideoCapture("rtsp://admin:cetc2121@192.168.1.64:554/h264/ch33/main/av_stream") # 打开相机

while True:

ret, frame = cap.read() # 读取相机图像

if not ret:

continue

with self.lock: # 获取锁

self.latest_image = frame # 更新最新的图像

time.sleep(0.01) # 线程等待一小段时间

def get_latest_image(self):

with self.lock: # 获取锁

return self.latest_image # 返回最新的图像

# 创建一个ImageThread实例,并启动线程

image_thread = ImageThread()

image_thread.start()

# 主进程调用最新的图像

while True:

latest_image = image_thread.get_latest_image()

if latest_image is not None:

cv2.imshow('Latest Image', latest_image) # 显示最新的图像

if cv2.waitKey(1) == ord('q'): # 按下'q'键退出

break

# 释放资源

cv2.destroyAllWindows()

备注: time.sleep(0.01) # 线程等待一小段时间,这句代码的目的是->"在这一步释放GIL锁",使得其他线程进行执行,直到每一个线程都释放GIL锁后,在逐次往下执行。

VideoCapture版–RTSP流不加锁版(因为没有锁,所以会有问题–教训版–(继承线程并重写run方法)

# -*- coding: utf-8 -*-

import threading

import time

import cv2

flag = True

# 创建一个线程类

class ImageThread(threading.Thread):

def __init__(self):

super(ImageThread, self).__init__()

self.latest_image = None # 存储最新的图像

def run(self):

cap = cv2.VideoCapture("rtsp://admin:cetc2121@192.168.1.64:554/h264/ch33/main/av_stream") # 打开相机

# cap = cv2.VideoCapture(4) # 打开相机

while True:

ret, frame = cap.read() # 读取相机图像

if not ret:

continue

self.latest_image = frame # 更新最新的图像

time.sleep(0.01) # 线程等待一小段时间

def get_latest_image(self):

return self.latest_image # 返回最新的图像

# 创建一个ImageThread实例,并启动线程

image_thread = ImageThread()

image_thread.start()

# 主进程调用最新的图像

while True:

# flag = False

# time.sleep(2)

latest_image = image_thread.get_latest_image()

if latest_image is not None:

cv2.imshow('Latest Image', latest_image) # 显示最新的图像

if cv2.waitKey(1) == ord('q'): # 按下'q'键退出

break

# 释放资源

cv2.destroyAllWindows()

直接拉流版 --最直接的 CV 拉流版

import cv2 # 引入CV库

# 创建窗口

cv2.namedWindow('video', cv2.WINDOW_NORMAL)

cv2.resizeWindow('video', 300, 300) # 设置窗口大小

# rtsp://admin:cetc2121@192.168.1.64:554/h264/ch33/main/av_stream

# 获取视频设备

cap = cv2.VideoCapture("rtsp://admin:cetc2121@192.168.1.64:554/h264/ch33/main/av_stream")

while True:

# 从摄像头读视频帧

ret, frame = cap.read()

# 将视频帧在窗口中显示

cv2.imshow('video', frame)

key = cv2.waitKey(27) # 不能为0,0为等待中断,只能读取到一帧的数据

if (key & 0xFF == ord('q')):

break

# 释放资源

cap.release() # 释放视频资源

cv2.destroyAllWindows() # 释放窗口资源

直接拉流存图版–适用于一切USB相机

import cv2 # 引入CV库

import os

# 创建窗口

cv2.namedWindow('video', cv2.WINDOW_NORMAL)

cv2.resizeWindow('video', 300, 300) # 设置窗口大小

# 获取视频设备

cap = cv2.VideoCapture(4)

i = 0

while True:

i+=1

# 从摄像头读视频帧

ret, frame = cap.read()

# 将视频帧在窗口中显示

frame = cv2.resize(frame,(640,480))

cv2.imshow('video', frame)

key = cv2.waitKey(27) # 不能为0,0为等待中断,只能读取到一帧的数据

if key & 0xFF == ord('q'):

break

elif key & 0xFF == ord('c'):

while f"data_{i}.png" in os.listdir("./"):

i += 1

cv2.imwrite(f"./data_{i}.png", frame)

# 释放资源

cap.release() # 释放视频资源

cv2.destroyAllWindows() # 释放窗口资源

直接拉流多线程存图版

import threading

import time

import cv2

# 创建一个线程类

class ImageThread(threading.Thread):

def __init__(self):

super(ImageThread, self).__init__()

self.latest_image = None # 存储最新的图像

self.lock = threading.Lock() # 创建一个锁对象

def run(self):

cap = cv2.VideoCapture(4) # 打开相机

if cap.isOpened():

size = (cap.get(cv2.CAP_PROP_FRAME_WIDTH), cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 读取帧的宽、高

speed = cap.get(cv2.CAP_PROP_FPS) # 获得帧速

else:

raise ZeroDivisionError("相机无法打开")

while True:

ret, frame = cap.read() # 读取相机图像

if not ret:

print("当前无法获取到图像")

continue

with self.lock: # 获取锁

self.latest_image = frame # 更新最新的图像

time.sleep(0.01) # 线程等待一小段时间

def get_latest_image(self):

with self.lock: # 获取锁

return self.latest_image # 返回最新的图像

# 创建一个ImageThread实例,并启动线程

image_thread = ImageThread()

image_thread.start()

# 主进程调用最新的图像

while True:

latest_image = image_thread.get_latest_image()

if latest_image is not None:

cv2.imshow('Latest Image', latest_image) # 显示最新的图像

if cv2.waitKey(1) == ord('q'): # 按下'q'键退出

break

# 释放资源

cv2.destroyAllWindows()

VideoCapture版–无锁–没继承–初始化对象即启动线程

import os

import time

from threading import Thread

import cv2

import numpy as np

class RealsenseCamera(object):

def __init__(self):

self.image = None

# 获取视频设备

self.capture = cv2.VideoCapture("")

self.get_img_thread = Thread(target=self.read_camera_img, args=())

def get_img_thread_start(self):

self.get_img_thread.daemon = True

self.get_img_thread.start()

time.sleep(2) # 主线程比子线程执行的要快,所以这里要多给子线程点时间

# self.get_img_thread.join()

def read_camera_img(self):

if self.capture.isOpened():

size = (self.capture.get(cv2.CAP_PROP_FRAME_WIDTH), self.capture.get(cv2.CAP_PROP_FRAME_HEIGHT)) # 读取帧的宽、高

speed = self.capture.get(cv2.CAP_PROP_FPS) # 获得帧速

try:

while True:

# Wait for a coherent pair of frames: depth and color

ret, color_image = self.capture.read()

if ret is True:

self.image = color_image

else:

print("无法拉取到正确的视频流")

break

finally:

# Stop streaming

cv2.destroyAllWindows()

def grab_image(self):

return self.image

if __name__ == '__main__':

test = RealsenseCamera()

test.get_img_thread_start()

while True:

color_image = test.grab_image()

cv2.namedWindow('RealSense', cv2.WINDOW_NORMAL)

cv2.imshow('RealSense', color_image)

key = cv2.waitKey(1)

if key & 0xFF == ord('q'):

break

realsense版–无锁–没继承–初始化对象即启动线程

import os

import time

from threading import Thread

import cv2

import numpy as np

import pyrealsense2 as rs

class RealsenseCamera(object):

def __init__(self):

self.pipeline = rs.pipeline()

self.config = rs.config()

self.image = None

self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

# 这里设置分辨率--档位为官方给定--适用于D435I相机(),所以其他型号的相机可能某些分辨率的档位会不同

# self.config.enable_stream(rs.stream.color, 320, 180, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 320, 240, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 424, 240, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 640, 360, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 848, 480, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)

self.config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)

self.get_img_thread = Thread(target=self.read_camera_img, args=())

def get_img_thread_start(self):

self.get_img_thread.daemon = True

self.get_img_thread.start()

time.sleep(2) # 主线程比子线程执行的要快,所以这里要多给子线程点时间

# self.get_img_thread.join()

def read_camera_img(self):

# Get device product line for setting a supporting resolution

# pipeline_wrapper = rs.pipeline_wrapper(self.pipeline)

# pipeline_profile = self.config.resolve(pipeline_wrapper)

# device = pipeline_profile.get_device()

# device_product_line = str(device.get_info(rs.camera_info.product_line))

# print(device_product_line)

# found_rgb = False

# for s in device.sensors:

# if s.get_info(rs.camera_info.name) == 'RGB Camera':

# found_rgb = True

# break

# if not found_rgb:

# print("The demo requires Depth camera with Color sensor")

# exit(0)

# Start streaming

self.pipeline.start(self.config)

try:

while True:

# Wait for a coherent pair of frames: depth and color

frames = self.pipeline.wait_for_frames()

depth_frame = frames.get_depth_frame()

color_frame = frames.get_color_frame()

depth_image = np.asanyarray(depth_frame.get_data())

color_image = np.asanyarray(color_frame.get_data())

if color_image is not None:

self.image = color_image

else:

print("color_image is None")

continue

finally:

# Stop streaming

self.pipeline.stop()

def grab_image(self):

return self.image

if __name__ == '__main__':

test = RealsenseCamera()

test.get_img_thread_start()

f_count = 0

i = 0

t1 = time.time()

while True:

if f_count > 300:

t1 = time.time()

f_count = 0

color_image = test.grab_image()

f_count += 1

fps = int(f_count / (time.time() - t1))

cv2.putText(color_image, "FPS: %.1f" % (fps), (int(20), int(40)), 0, 5e-3 * 200, (0, 255, 0), 3)

cv2.namedWindow('RealSense', cv2.WINDOW_NORMAL)

cv2.imshow('RealSense', color_image)

key = cv2.waitKey(1)

if key & 0xFF == ord('q'):

break

elif key & 0xFF == ord('c'):

while f"data_{i}.png" in os.listdir("./"):

i += 1

cv2.imwrite(f"./data_{i}.png", color_image)

realsense版–有锁–没继承–初始化对象即启动线程

加了读写锁

若有多个相机同时插入,可以选择使用哪个相机

"""

import threading

import time

import numpy as np

import cv2

import pyrealsense2 as rs

# 打印机身注册编码

ctx = rs.context()

if len(ctx.devices) > 0:

for d in ctx.devices:

print('Found device: ', \

d.get_info(rs.camera_info.name), ' ', \

d.get_info(rs.camera_info.serial_number))

class RealsenseCamera(object):

def __init__(self):

self.pipeline = rs.pipeline()

self.config = rs.config()

self.image = None

self.lock = threading.Lock() # 创建一个锁对象

# 这里指定播放的相机注册码(可直接输入机身自带的码)

self.config.enable_device(ctx.devices[0].get_info(rs.camera_info.serial_number))

self.config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, 30)

# 这里设置分辨率--档位为官方给定--适用于D435I相机(),所以其他型号的相机可能某些分辨率的档位会不同

# self.config.enable_stream(rs.stream.color, 320, 180, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 320, 240, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 424, 240, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 640, 360, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 640, 480, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 848, 480, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 960, 540, rs.format.bgr8, 60)

# self.config.enable_stream(rs.stream.color, 1280, 720, rs.format.bgr8, 30)

self.config.enable_stream(rs.stream.color, 1920, 1080, rs.format.bgr8, 30)

self.get_img_thread = threading.Thread(target=self.read_camera_img, args=())

def get_img_thread_start(self):

self.get_img_thread.daemon = True

self.get_img_thread.start()

time.sleep(2) # 主线程比子线程执行的要快,所以这里要多给子线程点时间

# self.get_img_thread.join()

def read_camera_img(self):

# Get device product line for setting a supporting resolution

# pipeline_wrapper = rs.pipeline_wrapper(self.pipeline)

# pipeline_profile = self.config.resolve(pipeline_wrapper)

# device = pipeline_profile.get_device()

# device_product_line = str(device.get_info(rs.camera_info.product_line))

# print(device_product_line)

# found_rgb = False

# for s in device.sensors:

# if s.get_info(rs.camera_info.name) == 'RGB Camera':

# found_rgb = True

# break

# if not found_rgb:

# print("The demo requires Depth camera with Color sensor")

# exit(0)

# Start streaming

self.pipeline.start(self.config)

try:

while True:

# Wait for a coherent pair of frames: depth and color

frames = self.pipeline.wait_for_frames()

depth_frame = frames.get_depth_frame()

color_frame = frames.get_color_frame()

depth_image = np.asanyarray(depth_frame.get_data())

color_image = np.asanyarray(color_frame.get_data())

with self.lock:

if color_image is not None:

self.image = color_image

else:

print("color_image is None")

continue

finally:

# Stop streaming

self.pipeline.stop()

def grab_image(self):

with self.lock:

return self.image

if __name__ == '__main__':

test = RealsenseCamera()

test.get_img_thread_start()

f_count = 0

i = 0

t1 = time.time()

while True:

if f_count > 3000:

t1 = time.time()

f_count = 0

color_image = test.grab_image()

f_count += 1

fps = int(f_count / (time.time() - t1))

cv2.putText(color_image, "FPS: %.1f" % (fps), (int(20), int(40)), 0, 5e-3 * 200, (0, 255, 0), 3)

cv2.namedWindow('RealSense', cv2.WINDOW_NORMAL)

cv2.imshow('RealSense', color_image)

key = cv2.waitKey(1)

if key & 0xFF == ord('q'):

break

elif key & 0xFF == ord('c'):

while f"data_{i}.png" in os.listdir("./"):

i += 1

cv2.imwrite(f"./data_{i}.png", color_image)

相关阅读

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: