importasynciofromfunctoolsimportpartialimportjsonimportosimportweakrefimportaiohttpfromaiohttpimportMultipartWriter,WSCloseCode,webimportnumpyasnpfromfury.decoratorsimportwarn_on_args_to_kwargstry:fromaiortcimportRTCPeerConnection,RTCSessionDescriptionfromaiortc.contrib.mediaimportMediaRelayWEBRTC_AVAILABLE=TrueexceptImportError:WEBRTC_AVAILABLE=Falseprint("webrtc not available")importloggingimporttimefromfury.stream.constantsimport_CQUEUE_EVENT_IDsasEVENT_IDslogging.basicConfig(level=logging.ERROR)pcs=set()asyncdefindex(request,**kwargs):folder=kwargs["folder"]just_mjpeg=kwargs["just_mjpeg"]index_file="index.html"ifjust_mjpeg:index_file="index_mjpeg.html"content=open(os.path.join(folder,index_file),"r").read()returnweb.Response(content_type="text/html",text=content)asyncdefjavascript(request,**kwargs):folder=kwargs["folder"]js=kwargs["js"]content=open(os.path.join(folder,"js/%s"%js),"r").read()returnweb.Response(content_type="application/javascript",text=content)asyncdefmjpeg_handler(request):"""This async function it's responsible to create the MJPEG streaming. Notes ----- endpoint : /video/mjpeg """my_boundary="image-boundary"response=web.StreamResponse(status=200,reason="OK",headers={"Content-Type":"multipart/x-mixed-replace;boundary={}".format(my_boundary)},)awaitresponse.prepare(request)image_buffer_manager=request.app["image_buffer_manager"]whileTrue:jpeg_bytes=awaitimage_buffer_manager.async_get_jpeg()withMultipartWriter("image/jpeg",boundary=my_boundary)asmpwriter:mpwriter.append(jpeg_bytes,{"Content-Type":"image/jpeg"})try:awaitmpwriter.write(response,close_boundary=False)exceptConnectionResetError:logging.info("Client connection closed")breakawaitresponse.write(b"\r\n")asyncdefoffer(request,**kwargs):video=kwargs["video"]if"broadcast"inkwargsandkwargs["broadcast"]:video=MediaRelay().subscribe(video)params=awaitrequest.json()offer=RTCSessionDescription(sdp=params["sdp"],type=params["type"])pc=RTCPeerConnection()pcs.add(pc)@pc.on("connectionstatechange")asyncdefon_connectionstatechange():print("Connection state is %s"%pc.connectionState)ifpc.connectionState=="failed":awaitpc.close()pcs.discard(pc)# open media sourceaudio=Noneawaitpc.setRemoteDescription(offer)fortinpc.getTransceivers():ift.kind=="audio"andaudio:pc.addTrack(audio)elift.kind=="video"andvideo:pc.addTrack(video)answer=awaitpc.createAnswer()awaitpc.setLocalDescription(answer)returnweb.Response(content_type="application/json",text=json.dumps({"sdp":pc.localDescription.sdp,"type":pc.localDescription.type}),)
[docs]defset_weel(data,circular_queue):deltaY=float(data["deltaY"])user_envent_ms=float(data["timestampInMs"])ok=circular_queue.enqueue(np.array([EVENT_IDs.mouse_weel,deltaY,0,0,0,0,user_envent_ms,0],dtype="float64",))ts=time.time()*1000logging.info(f"WEEL Time until enqueue {ts-user_envent_ms:.2f} ms")returnok