Administrador de Esp32-Cam con MicroPython
Presentación
El presente proyecto se intenta desarrollar un sistema parecido al ejemplo del IDE de Arduino ubicado en “Archivo/Ejemplos/ESP32/Camera/Camera Web Server”.
En este proyecto utilizamos la Camara Ov2640 que viene integrada a la placa Esp32-Cam para capturar video que puede verse online en el servidor instalado en la misma placa .
Una vez arrancada la placa nos muestra en la consola el IP asignado y al ingresar a traves del navegador se ve la pantalla con el video que se obtiene a traves de la camara. La imagen de la pantalla es la siguiente:
Materiales
Placa Esp32-Cam
Programador Esp32-Cam
Si bien por ser mas cómodo y práctico se ha usado el adaptador el mismo puede ser remplazado por una placa FTDI y el cableado pertinente.
Montaje del proyecto en la placa Esp8266
Para utilizar la placa Esp32-Cam con MicroPython es necesario flashear primero la misma con el firmware correspondiente. En el caso de este proyecto se utilizo el provisto por la pagina de https://github.com/lemariva/micropython-camera-driver/tree/master/firmware
Cabe destacar que en el link indicado hay dos firmware micropython_camera_feeeb5ea3_esp32_idf4_4.bin y micropython_cmake_9fef1c0bd_esp32_idf4.x_ble_camera.bin y usaremos el primero de ellos
Una vez instalado el firmware en la placa utilizando la utilidad que viene provista por el IDE Thonny, o por aquella que ustedes utilicen ademas del firmware quedara grabado en la placa un archivo boot.py que debe ser reemplazo por el que se incluye en el codigo desarrollado a continuacion.
En este proyecyo he utilizado la librería uasyncio a efectos de manejar en forma simultanea los controles de la camara y la transmision de video, para ello se deben copiar en la placa la carpeta de nombre uasyncio que contiene los archivos de la libreria la cual se puede descargar del siguiente link
Codigo
El codigo se ha escrito en dos archivos Conexion.py y main.py.
El archivo Conexion.py contiene los datos necesarios para la conexión a la red wifi a la que se va a conectar los cuales deben ser agregados en el lugar indicado.
El archivo main.py es el nucleo del servidor, el nombre elegido para el mismo es a efectos que el programa se inicie automaticamente.
Conexion.py
#Configuracion de conexion WIFI:
from time import sleep
import network
class Conexion:
red = " " #completar con nombre de la red
clave = " " #completar con clave de la red
def __init__(mi, red='', clave=''):
network.WLAN(network.AP_IF).active(False) # disable access point
mi.wlan = network.WLAN(network.STA_IF)
mi.wlan.active(True)
if red == '':
mi.red = Conexion.red
mi.clave = Conexion.clave
else:
mi.red = red
mi.clave = clave
def conectar(mi, red='', clave=''):
if red != '':
mi.red = red
mi.clave = clave
if not mi.wlan.isconnected():
mi.wlan.connect(mi.red, mi.clave)
def status(mi):
if mi.wlan.isconnected():
return mi.wlan.ifconfig()
else:
return ()
def esperar(mi):
cnt = 30
while cnt > 0:
print("Esperando ..." )
# con(mi.red, mi.clave) # Connect to an red
if mi.wlan.isconnected():
print('Se conecto a: %s' % mi.red)
print('IP: %s\nSUBNET: %s\nGATEWAY: %s\nDNS: %s' % mi.wlan.ifconfig()[0:4])
cnt = 0
else:
sleep(5)
cnt -= 5
return
def scan(mi):
return mi.wlan.scan() # Scan for available access points
main.py
import usocket as soc
import uasyncio as sy
import camera
import time
import esp
import machine
import gc
from Conexion import Conexion
#esp.osdebug(False)
esp.osdebug(True)
# PAGINAS *****************************************************
pag_inicio = ""
archivo=open('pagina.html', 'r')
for línea in archivo:
pag_inicio = pag_inicio + línea
archivo.close
# página de transmisión
# URL: /pag_camara
pag_camara = """HTTP/1.1 200 OK
Content-Type: text/html; charset=utf-8
<html>
<head>
<title>Transmisión de video</title>
</head>
<body>
<center>
<h1>Transmisión de video</h1>
<img src="http://192.168.0.157:81/"/>
<br>
<a class="botones" href="http://192.168.0.157">Inicio</a>
</center>
</body>
</html>
"""
# transmisión en vivo
# URL: /emision
pag_emision = """HTTP/1.1 200 OK
Content-Type: multipart/x-mixed-replace; boundary=frame
Connection: keep-alive
Cache-Control: no-cache, no-store, max-age=0, must-revalidate
Expires: Thu, Jan 01 1970 00:00:00 GMT
Pragma: no-cache
"""
# transmisión en vivo
# URL:
pag_marco = """--frame
Content-Type: image/jpeg
"""
# error de solicitud
# URL: /favicon.ico
pag_favicon = """HTTP/1.1 404
"""
# URL: all the rest
pag_error = """HTTP/1.1 400 Bad Request
Content-Type: text/plain; charset=utf-8
No se puede transmitir
"""
# FUNCIONES *****************************************************
def iniciarcamara():
wc = 0
while True:
global camara
camara = camera.init(0, format=camera.JPEG)
camera.framesize(7)
camera.quality(15)
print("¿Cámara lista?: ", camara)
if camara:
break
time.sleep(2)
wc += 1
if wc >= 5:
break
def borrar(cs):
cs.close() # flash buffer y cierre zocalo
del cs
gc.collect()
def crear_cuadro():
while True:
buf = camera.capture()
yield buf
del buf
gc.collect()
async def enviar_marco(pp):
cs, h = pp
while True:
ee = ''
try:
cs.send(b'%s' % h)
cs.send(next(pic))
cs.send(b'\r\n') # enviar y vaciar el búfer de envío
except Exception as e:
ee = str(e)
if ee == '':
await sy.sleep_ms(5) # intentar lo más rápido posible
else:
break
borrar(cs)
return
def servidores():
zocalos = []
# puerto 80 - servidor de comandos
s = soc.socket(soc.AF_INET, soc.SOCK_STREAM)
s.setsockopt(soc.SOL_SOCKET, soc.SO_REUSEADDR, 1)
a = ('0.0.0.0', 80)
s.bind(a)
s.listen(3) # at most 3 clients
zocalos.append(s)
# puerto 81 - servidor de transmisión
s = soc.socket(soc.AF_INET, soc.SOCK_STREAM)
s.setsockopt(soc.SOL_SOCKET, soc.SO_REUSEADDR, 1)
a = ('0.0.0.0', 81)
s.bind(a)
s.listen(3)
zocalos.append(s)
return zocalos
async def puerto1(cs, rq):
lug = rq[1].find('?')
if lug > 0:
valor = rq[1][lug + 7:len(rq[1])]
rq[1] = rq[1][:lug]
valor = int(valor)
if rq[1] == '/':
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/pag_camara':
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/rot_camara':
global rot
if rot == 0:
rot = 1
else:
rot = 0
camera.flip(rot)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/inv_camara':
global inv
if inv == 0:
inv = 1
else:
inv = 0
camera.mirror(inv)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/cuadro':
camera.framesize(valor)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/calidad':
camera.quality(valor)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/contraste':
camera.contrast(valor)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/saturacion':
camera.saturation(valor)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/brillo':
camera.brightness(valor)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/efectos':
camera.speffect(valor)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/balance':
camera.whitebalance(valor)
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/prender_led':
led_camara.on()
cs.send(b'%s' % pag_inicio)
elif rq[1] == '/apagar_led':
led_camara.off()
cs.send(b'%s' % pag_inicio)
else:
cs.send(b'%s' % pag_error)
print("error")
print(rq[1])
borrar(cs)
async def puerto2(cs, rq):
if rq[1] == '/':
cs.send(b'%s' % pag_emision)
# programar marco de envío
await enviar_marco([cs, pag_marco])
else:
cs.send(b'%s' % pag_error)
borrar(cs)
async def srv(p):
sa = zocalos[p] # servidor programado
while True:
ok = True
ee = ''
yield
try:
sa.settimeout(0.05) # en segundos ¡NOTA! tiempo de espera predeterminado del navegador (5-15 min)
cs, ca = sa.accept()
cs.settimeout(0.5) # en segundos
except Exception as e:
ee = str(e)
if ee != '':
pass
ok = False
yield
if ok:
ee = ''
try:
# cliente aceptado
r = cs.recv(1024)
except Exception as e:
ee = str(e)
if ee != '':
print(ee)
ok = False
else:
ms = r.decode('utf-8')
if ms.find('favicon.ico') < 0:
rq = ms.split(' ')
try:
print(rq[0], rq[1], ca)
except:
ok = False
else:
cs.send(b'%s' % pag_favicon)
borrar(cs)
ok = False
yield
if ok:
await puertos[p](cs, rq)
# PROGRAMA *****************************************************
iniciarcamara()
rot = 0
inv = 0
conec = Conexion()
conec.conectar()
conec.esperar()
wc = 0
while not conec.wlan.isconnected():
print("Wi-Fi no está listo. Esperar...")
time.sleep(2)
wc += 1
if wc >= 5:
break
if wc >= 5:
print("WIFI no está listo. ¡No puede continuar!")
else:
pic = crear_cuadro()
led_camara = machine.Pin(04,machine.Pin.OUT)
zocalos = servidores()
puertos = [puerto1, puerto2] # 80, 81
loop = sy.get_event_loop()
i = 0
for i in range(len(zocalos)):
if i == 0:
loop.create_task(srv(i))
loop.create_task(srv(i))
else:
loop.create_task(srv(i))
print("Servidor y camara funcionando!")
p = pag_inicio.find('ipcamara')
pag_inicio = pag_inicio[:p] + conec.status()[0] + pag_inicio[p+8:]
loop.run_forever()