Flask读取摄像头视频流,为API使用一个观察者模式

问题:

服务器启动后,只有首次请求web服务正常,再次请求就会出现卡死的现象。

原因

web服务会注册从摄像头捕获视频帧的线程,并且线程不会自动注销

首次请求web服务时,从摄像头捕获视频帧的后台线程就开始了,但之后它永远不会停止,即使你离开了这个web服务

当用户再次请求web服务时,后台会再次创建一个线程,尝试从摄像头捕获视频帧。

此时摄像头被多个线程同时访问,摄像头无法处理,就出现卡死情况。

基于这个原因:

只要让上次web服务遗留的线程自动注销,下次访问web服务时,就可正常工作。(具体实现见我上篇博客

观察者模式

考虑到web服务不是只面向一个人、一台主机,如果多台主机同时访问web服务,还是会出现多线程同时访问摄像头的场景。

最后用设计模式里的观察者模式,解决此问题!

观察者模式避免了很多个对象同时去访问同一片数据,这是一种推送的方式。
在这里插入图片描述

主体对象:从摄像头获取视频帧的对象

观察者对象:web服务对象

每次请求web服务时,会创建一个观察者对象,同时将这个对象添加到主体对象的观察者队列,每当主体对象获取到新的视频帧时,它会依次去通知在它的队列中注册过的观察者。从而使所有的web服务都能同步工作。

框架:


''' 

The second Design Pattern:

    Observer Pattern

KeyNote:

    

'''

class Observer:

    def update(self, temp, humidity, pressure):

        return

    def display(self):

        return

 

class Subject:

    def registerObserver(self, observer):

        return

    def removeObserver(self, observer):

        return

    def notifyObservers(self):

        return

 

class WeatherData(Subject):

    def __init__(self):

        self.observers = []

        self.temperature = 0.0

        self.humidity = 0.0

        self.pressure = 0.0

        return

    def registerObserver(self, observer):

        self.observers.append(observer)

        return

    def removeObserver(self, observer):

        self.observers.remove(observer)

        return

    def getTemperature(self):

        return self.temperature

    def getHumidity(self):

        return self.humidity

    def getPressure(self):

        return self.pressure

    def measurementsChanged(self):

        self.notifyObservers()

        return

    def setMesurement(self, temp, humidity, pressure):

        self.temperature = temp

        self.humidity = humidity

        self.pressure = pressure

        self.measurementsChanged()

        return

    def notifyObservers(self):

        for item in self.observers:

            item.update(self.temperature, self.humidity, self.pressure)

        return

 

class CurrentConditionDisplay(Observer):

    def __init__(self, weatherData):

        self.weatherData = weatherData

        self.temperature = 0.0

        self.humidity = 0.0

        self.pressure = 0.0

        weatherData.registerObserver(self)

        return

    def update(self, temp, humidity, pressure):

        self.temperature = temp

        self.humidity = humidity

        self.pressure = pressure

        self.display()

        return

    def display(self):

        print( 'temprature = %f, humidity = %f.' % \

                (self.temperature, self.humidity))

        return

 

class StatiticDisplay(Observer):

    def __init__(self, weatherData):

        self.weatherData = weatherData

        self.temperature = 0.0

        self.humidity = 0.0

        self.pressure = 0.0

        weatherData.registerObserver(self)

        return

    def update(self, temp, humidity, pressure):

        self.temperature = temp

        self.humidity = humidity

        self.pressure = pressure

        self.display()

        return

    def display(self):

        print('Statictic: t = %f, h = %f, pressure = %f.' % \

                (self.temperature, self.humidity, self.pressure))

        return

 

    

weather = WeatherData()

display1 = CurrentConditionDisplay(weather)

weather.setMesurement(2.0, 3.0, 4.0)

display2 = StatiticDisplay(weather)

weather.setMesurement(3.0, 4.0, 5.0)

具体实现:

# !/usr/local/bin/python3
# encodin: utf-8
# author: cx
"""经过测试 cv2.VideoCapture 的 read 函数并不能获取实时流的最新帧
而是按照内部缓冲区中顺序逐帧的读取,opencv会每过一段时间清空一次缓冲区
但是清空的时机并不是我们能够控制的,因此如果对视频帧的处理速度如果跟不上接受速度
那么每过一段时间,在播放时(imshow)时会看到画面突然花屏,甚至程序直接崩溃

在网上查了很多资料,处理方式基本是一个思想
使用一个临时缓存,可以是一个变量保存最新一帧,也可以是一个队列保存一些帧
然后开启一个线程读取最新帧保存到缓存里,用户读取的时候只返回最新的一帧
这里我是使用了一个变量保存最新帧

注意:这个处理方式只是防止处理(解码、计算或播放)速度跟不上输入速度
而导致程序崩溃或者后续视频画面花屏,在读取时还是丢弃一些视频帧

这个在高性能机器上也没啥必要 [/doge]
"""
import gc
import os
import time

import cupy
import numpy as np
from matplotlib import pyplot
from tensorflow.python.keras.backend import set_session

from apps.models.model import detect_save, get_variable_from_model

from apps import STATIC_DIR, app

import threading
import cv2


class Observer:

    def update(self,ok,frame):
        return

    def display(self):
        return


class Subject:

    def registerObserver(self, observer):
        return

    def removeObserver(self, observer):
        return

    def notifyObservers(self):
        return

class RTSCapture(cv2.VideoCapture,Subject,object):
    """Real Time Streaming Capture.
    这个类必须使用 RTSCapture.create 方法创建,请不要直接实例化
    """

    _cur_frame = None
    _reading = False
    schemes = ["rtsp://", "rtmp://"]  # 用于识别实时流

    @staticmethod
    def create(url, *schemes):
        """实例化&初始化
        rtscap = RTSCapture.create("rtsp://example.com/live/1")
        or
        rtscap = RTSCapture.create("http://example.com/live/1.m3u8", "http://")
        """

        rtscap = RTSCapture(url)
        rtscap.observers = []
        rtscap.frame_receiver = threading.Thread(target=rtscap.notifyObservers, daemon=True)
        rtscap.schemes.extend(schemes)

        if isinstance(url, str) and url.startswith(tuple(rtscap.schemes)):
            rtscap._reading = True
        elif isinstance(url, int):
            # 这里可能是本机设备
            pass

        return rtscap

    def registerObserver(self, observer):
        self.observers.append(observer)
        # print('observers:', self.observers)

        return

    def removeObserver(self, observer):
        self.observers.remove(observer)

        return

    def isStarted(self):
        """替代 VideoCapture.isOpened() """
        ok = self.isOpened()
        if ok and self._reading:
            ok = self.frame_receiver.is_alive()
        return ok



    def notifyObservers(self):
        """子线程读取最新视频帧方法"""
        while self._reading and self.isOpened():
            ok, frame = self.read()
            # print(type(frame))
            # print(self.observers)
            for item in self.observers:
                # print("update")
                if item.last_access != 0 and time.time() - item.last_access > 30:
                    self.removeObserver(item)
                    del item
                    # gc.collect() #释放循环引用
                    continue
                item.update(ok, frame)
                # item.display()
            if not ok: break
            self._cur_frame = frame
        self._reading = False

    def read2(self):
        """读取最新视频帧
        返回结果格式与 VideoCapture.read() 一样
        """
        frame = self._cur_frame
        self._cur_frame = None

        return frame is not None, frame



    def start_read(self):
        """启动子线程读取视频帧"""
        self.frame_receiver.start()
        self.read_latest_frame = self.read2 if self._reading else self.read

    def stop_read(self):
        """退出子线程方法"""
        self._reading = False
        if self.frame_receiver.is_alive(): self.frame_receiver.join()


class Flow_Response (Observer,object):



    ok = False
    frame = None
    last_access = 0

    @staticmethod
    def create(weatherData):
        """实例化&初始化
        rtscap = RTSCapture.create("rtsp://example.com/live/1")
        or
        rtscap = RTSCapture.create("http://example.com/live/1.m3u8", "http://")
        """

        rtscap = Flow_Response()
        weatherData.registerObserver(rtscap)

        return rtscap

    def update(self,ok,frame):

        self.ok = ok
        self.frame = frame

        return
    def display(self):

        print(self.ok,self.frame)

        return

    def get_frame(self):

        return self.frame

    def get_bytes(self):
        # print(self.ok,self.frame)
        self.last_access = time.time()
        ret, jpeg = cv2.imencode('.jpg', self.frame)

        return jpeg.tobytes()

    def detect(self,detect_relative,response_name):
        self.last_access = time.time()
        filename = os.path.join(STATIC_DIR, detect_relative, response_name)
        graph = get_variable_from_model('graph')
        sess = get_variable_from_model('sess')
        model = get_variable_from_model('model')
        with graph.as_default():
            set_session(sess)
            detect_save(self.frame, model, filename)
        result_frame = pyplot.imread(filename)
        ret, result_jpeg = cv2.imencode('.jpg', result_frame)

        return result_jpeg.tobytes()


video_subject = RTSCapture.create(app.config['video'],"http://")
video_subject.start_read()

# while True:
#     url_observer.video()
def feed_gen():
    global video_subject
    url_observer = Flow_Response.create(video_subject)
    while True:

        # 帧处理代码写这里
        frame = url_observer.get_frame()
        # url_observer.display()
        try:
            if frame == None:
                continue
        except:
            bytes = url_observer.get_bytes()

            # 使用generator函数输出视频流, 每次请求输出的content类型是image/jpeg
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + bytes + b'\r\n\r\n')



def out_gen(detect_relative,response_name):
    global video_subject
    url_observer = Flow_Response.create(video_subject)
    while True:

        # 帧处理代码写这里
        frame = url_observer.get_frame()
        # url_observer.display()
        try:
            if frame == None:
                continue
        except:
            bytes = url_observer.detect(detect_relative,response_name)

            # 使用generator函数输出视频流, 每次请求输出的content类型是image/jpeg
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + bytes + b'\r\n\r\n')


参考文章:
用OpenCV(Python)从Flask服务器读取MJPEG流
设计模式之观察者模式(Python实现)
Python实现观察者模式

更多相关推荐

python观察者模式 多线程_Python...

标签:python设计模式观察者模式引子这两天读了HeadFirst设计模式和Python编程实战两本书,对...

继续阅读

python观察者模式 多线程_Python...

例子1:classService:def__init__(self,service_name,process_name,port,enable_monitor=None)...

继续阅读

python观察者模式 多线程_【pyth...

观察者模式:观察者模式定义了对象之间的一对多依赖,这样当一个对象改变状态时,它的所有对象...

继续阅读

python观察者模式 多线程_Python...

#-*-coding:utf-8-*-classObserver(object):#此处初始化一个列表,用来收集“观察者”def__init_...

继续阅读

python观察者模式_python 观察者...

python观察者模式前言emm写的仓促就不截UML类图了,书本Chapter10,P313能看到图代码一旦观...

继续阅读

python观察者模式_设计模式(Pyth...

本系列文章是希望将软件项目中最常见的设计模式用通俗易懂的语言来讲解清楚,并通过Python来实...

继续阅读

python观察者模式_Python 设计模...

行为型模式中的观察者模式创建型模式(比如单例模式)是基于对象的创建机制的,这些模式隔离了对...

继续阅读

python观察者模式 多线程_《 Hea...

病殃殃的还是来记录一下学习笔记,不然的话过几天可能就忘了,白学了。observermode在java用得...

继续阅读

python观察者模式 多线程_10分钟...

原标题:10分钟学Python设计模式|观察者模式这是一篇以前的设计模式的文章,零零散散,最近的...

继续阅读

python观察者模式 多线程_一起学...

在前面的文章中介绍过观察者模式及并发编程的基础知识,为了让大家更好的了解观察者模式故而特...

继续阅读