Administrador de Esp32-Cam con MicroPython




Presentación


El presente proyecto se intenta desarrollar un sistema parecido al ejemplo del IDE de Arduino ubicado en “Archivo/Ejemplos/ESP32/Camera/Camera Web Server”.

En este proyecto utilizamos la Camara Ov2640 que viene integrada a la placa Esp32-Cam para capturar video que puede verse online en el servidor instalado en la misma placa .

Una vez arrancada la placa nos muestra en la consola el IP asignado y al ingresar a traves del navegador se ve la pantalla con el video que se obtiene a traves de la camara. La imagen de la pantalla es la siguiente:



Materiales


Placa Esp32-Cam

Programador Esp32-Cam


Si bien por ser mas cómodo y práctico se ha usado el adaptador el mismo puede ser remplazado por una placa FTDI y el cableado pertinente.


Montaje del proyecto en la placa Esp8266


Para utilizar la placa Esp32-Cam con MicroPython es necesario flashear primero la misma con el firmware correspondiente. En el caso de este proyecto se utilizo el provisto por la pagina de https://github.com/lemariva/micropython-camera-driver/tree/master/firmware

Cabe destacar que en el link indicado hay dos firmware micropython_camera_feeeb5ea3_esp32_idf4_4.bin y micropython_cmake_9fef1c0bd_esp32_idf4.x_ble_camera.bin y usaremos el primero de ellos

Una vez instalado el firmware en la placa utilizando la utilidad que viene provista por el IDE Thonny, o por aquella que ustedes utilicen ademas del firmware quedara grabado en la placa un archivo boot.py que debe ser reemplazo por el que se incluye en el codigo desarrollado a continuacion.

En este proyecyo he utilizado la librería uasyncio a efectos de manejar en forma simultanea los controles de la camara y la transmision de video, para ello se deben copiar en la placa la carpeta de nombre uasyncio que contiene los archivos de la libreria la cual se puede descargar del siguiente link

Libreria uasyncio


Codigo


El codigo se ha escrito en dos archivos Conexion.py y main.py.

El archivo Conexion.py contiene los datos necesarios para la conexión a la red wifi a la que se va a conectar los cuales deben ser agregados en el lugar indicado.

El archivo main.py es el nucleo del servidor, el nombre elegido para el mismo es a efectos que el programa se inicie automaticamente.


Conexion.py



        #Configuracion de conexion WIFI:
        from time import sleep
        import network
        class Conexion:
            red = " " #completar con nombre de la red
            clave = " " #completar con clave de la red

            def __init__(mi, red='', clave=''):
                network.WLAN(network.AP_IF).active(False) # disable access point
                mi.wlan = network.WLAN(network.STA_IF)
                mi.wlan.active(True)
                if red == '':
                    mi.red = Conexion.red
                    mi.clave = Conexion.clave
                else:
                    mi.red = red
                    mi.clave = clave

            def conectar(mi, red='', clave=''):
                if red != '':
                    mi.red = red
                    mi.clave = clave
                if not mi.wlan.isconnected():
                    mi.wlan.connect(mi.red, mi.clave)

            def status(mi):
                if mi.wlan.isconnected():
                    return mi.wlan.ifconfig()
                else:
                    return ()

            def esperar(mi):
                cnt = 30
                while cnt > 0:
                    print("Esperando ..." )
                    # con(mi.red, mi.clave) # Connect to an red
                    if mi.wlan.isconnected():
                        print('Se conecto a: %s' % mi.red)
                        print('IP: %s\nSUBNET: %s\nGATEWAY: %s\nDNS: %s' % mi.wlan.ifconfig()[0:4])
                        cnt = 0
                    else:
                        sleep(5)
                        cnt -= 5
                return

            def scan(mi):
                return mi.wlan.scan()  # Scan for available access points
            
    

main.py



        import usocket as soc
        import uasyncio as sy
        import camera
        import time
        import esp
        import machine
        import gc
        from Conexion import Conexion
        
        #esp.osdebug(False)
        esp.osdebug(True)
        
        
        # PAGINAS *****************************************************
        pag_inicio = ""
        archivo=open('pagina.html', 'r')
        for línea in archivo:
             pag_inicio = pag_inicio + línea
        archivo.close
        
        
        
          # página de transmisión
          # URL: /pag_camara
        pag_camara = """HTTP/1.1 200 OK
        Content-Type: text/html; charset=utf-8
        
        <html>
        <head>
        <title>Transmisión de video</title>
        </head>
        <body>
          <center>
            <h1>Transmisión de video</h1>
            <img src="http://192.168.0.157:81/"/>
            <br>
            <a class="botones" href="http://192.168.0.157">Inicio</a>
          </center>
        </body>
        </html>
        
        """
          # transmisión en vivo
          # URL: /emision
        pag_emision = """HTTP/1.1 200 OK
        Content-Type: multipart/x-mixed-replace; boundary=frame
        Connection: keep-alive
        Cache-Control: no-cache, no-store, max-age=0, must-revalidate
        Expires: Thu, Jan 01 1970 00:00:00 GMT
        Pragma: no-cache
        
        """
          # transmisión en vivo
          # URL: 
        pag_marco = """--frame
        Content-Type: image/jpeg
        
        """
        
          # error de solicitud
          # URL: /favicon.ico
        pag_favicon = """HTTP/1.1 404 
        
        
        """
          
          # URL: all the rest
        pag_error = """HTTP/1.1 400 Bad Request
        Content-Type: text/plain; charset=utf-8
        
        No se puede transmitir
        
        """
        
        # FUNCIONES *****************************************************
        
        
        def iniciarcamara():
            wc = 0
            while True:
              global camara  
              camara = camera.init(0, format=camera.JPEG)
              camera.framesize(7)
              camera.quality(15)
              print("¿Cámara lista?: ", camara)
              if camara:
                break
              time.sleep(2)
              wc += 1
              if wc >= 5:
                break
         
        def borrar(cs):
           
           cs.close() # flash buffer y cierre zocalo
           del cs
           gc.collect()
        
        def crear_cuadro():
           
           while True:
             buf = camera.capture()
             yield buf
             del buf
             gc.collect()
        
        async def enviar_marco(pp):
           
           cs, h = pp
           while True:
              ee = ''
              try:
                 cs.send(b'%s' % h)
                 
                 cs.send(next(pic))
                 cs.send(b'\r\n')  # enviar y vaciar el búfer de envío
              except Exception as e:
                ee = str(e)
              if ee == '': 
                await sy.sleep_ms(5)  # intentar lo más rápido posible
              else:
                break
        
           borrar(cs)
           return   
        
        def servidores():
           zocalos = []
           # puerto 80  - servidor de comandos
           s = soc.socket(soc.AF_INET, soc.SOCK_STREAM)
           s.setsockopt(soc.SOL_SOCKET, soc.SO_REUSEADDR, 1)
           a = ('0.0.0.0', 80)
           s.bind(a)
           s.listen(3)  # at most 3 clients
           zocalos.append(s)
        
           # puerto 81 - servidor de transmisión
           s = soc.socket(soc.AF_INET, soc.SOCK_STREAM)
           s.setsockopt(soc.SOL_SOCKET, soc.SO_REUSEADDR, 1)
           a = ('0.0.0.0', 81)
           s.bind(a)
           s.listen(3)
           zocalos.append(s)
        
           return zocalos
        
        async def puerto1(cs, rq):
           lug = rq[1].find('?')
           if lug > 0:
               valor = rq[1][lug + 7:len(rq[1])]
               rq[1] = rq[1][:lug]
               valor = int(valor)
               
               
           if rq[1] == '/': 
                 cs.send(b'%s' % pag_inicio)
                     
           elif rq[1] == '/pag_camara': 
                 cs.send(b'%s' % pag_inicio)
        
           elif rq[1] == '/rot_camara':
                 global rot  
                 if rot == 0:
                     rot = 1
                 else:
                     rot = 0
                 camera.flip(rot)
                 cs.send(b'%s' % pag_inicio)
                 
           elif rq[1] == '/inv_camara':
                 global inv
                 if inv == 0:
                     inv = 1
                 else:
                     inv = 0
                 camera.mirror(inv)
                 cs.send(b'%s' % pag_inicio)
           
           elif rq[1] == '/cuadro':
                 camera.framesize(valor)
                 cs.send(b'%s' % pag_inicio)
           
           elif rq[1] == '/calidad':
                 camera.quality(valor)
                 cs.send(b'%s' % pag_inicio)
        
           elif rq[1] == '/contraste':
                 camera.contrast(valor)
                 cs.send(b'%s' % pag_inicio)
        
           elif rq[1] == '/saturacion':
                 camera.saturation(valor)
                 cs.send(b'%s' % pag_inicio)
        
           elif rq[1] == '/brillo':
                 camera.brightness(valor)
                 cs.send(b'%s' % pag_inicio)         
        
           elif rq[1] == '/efectos':
                 camera.speffect(valor)
                 cs.send(b'%s' % pag_inicio)
        
           elif rq[1] == '/balance':
                 camera.whitebalance(valor)
                 cs.send(b'%s' % pag_inicio)
        
           elif rq[1] == '/prender_led':
                 led_camara.on() 
                 cs.send(b'%s' % pag_inicio)
           
           elif rq[1] == '/apagar_led':
                 led_camara.off()  
                 cs.send(b'%s' % pag_inicio)
                 
                 
           else:
                 cs.send(b'%s' % pag_error)
                 print("error")
                 print(rq[1])
        
                 borrar(cs)
                 
        
        
        async def puerto2(cs, rq):
           
           if rq[1] == '/': 
                 cs.send(b'%s' % pag_emision)
                 # programar marco de envío
                 await enviar_marco([cs, pag_marco])
                 
           else:
                 cs.send(b'%s' % pag_error)
                 borrar(cs)
        
        
        async def srv(p):
          sa = zocalos[p] # servidor programado
          while True:
             ok = True
             ee = ''
             yield
             try:
                sa.settimeout(0.05) # en segundos ¡NOTA! tiempo de espera predeterminado del navegador (5-15 min)
                cs, ca = sa.accept()
                cs.settimeout(0.5) # en segundos
             except Exception as e:
                ee = str(e)
             if ee != '':
                
                pass
                ok = False
             yield
             if ok: 
                ee = ''
                try:
                   # cliente aceptado 
                   r = cs.recv(1024)
                   
                except Exception as e:
                   ee = str(e)
                if ee != '':
                   print(ee)
                   ok = False
                else:
                   ms = r.decode('utf-8')
                   if ms.find('favicon.ico') < 0:
                      rq = ms.split(' ')
                      try:
                         print(rq[0], rq[1], ca)
                      except:
                         ok = False
                   else:
                      cs.send(b'%s' % pag_favicon) 
                      borrar(cs)
                      ok = False
             yield
             if ok: 
                await puertos[p](cs, rq)
        
        
        # PROGRAMA *****************************************************
        
        iniciarcamara()
        rot = 0
        inv = 0
        
        conec = Conexion()
        conec.conectar()
        conec.esperar()
        wc = 0
        while not conec.wlan.isconnected():
             print("Wi-Fi no está listo. Esperar...")
             time.sleep(2)
             wc += 1
             if wc >= 5:
               break
        if wc >= 5:
             print("WIFI no está listo. ¡No puede continuar!")
        else:
             pic = crear_cuadro()
             led_camara = machine.Pin(04,machine.Pin.OUT)
             zocalos = servidores()
             puertos = [puerto1, puerto2] # 80, 81
             loop = sy.get_event_loop()
             i = 0
             for i in range(len(zocalos)):
                if i == 0:
                   loop.create_task(srv(i)) 
                   loop.create_task(srv(i)) 
                else:
                   loop.create_task(srv(i)) 
        
             print("Servidor y camara funcionando!")
             p = pag_inicio.find('ipcamara')
             pag_inicio = pag_inicio[:p] + conec.status()[0] + pag_inicio[p+8:] 
             loop.run_forever()
        
      
    

Cerrar