import asyncio
from functools import partial
import json
import os
import weakref
import aiohttp
from aiohttp import MultipartWriter, WSCloseCode, web
import numpy as np
from fury.decorators import warn_on_args_to_kwargs
try:
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaRelay
WEBRTC_AVAILABLE = True
except ImportError:
WEBRTC_AVAILABLE = False
print("webrtc not available")
import logging
import time
from fury.stream.constants import _CQUEUE_EVENT_IDs as EVENT_IDs
logging.basicConfig(level=logging.ERROR)
pcs = set()
async def index(request, **kwargs):
folder = kwargs["folder"]
just_mjpeg = kwargs["just_mjpeg"]
index_file = "index.html"
if just_mjpeg:
index_file = "index_mjpeg.html"
content = open(os.path.join(folder, index_file), "r").read()
return web.Response(content_type="text/html", text=content)
async def javascript(request, **kwargs):
folder = kwargs["folder"]
js = kwargs["js"]
content = open(os.path.join(folder, "js/%s" % js), "r").read()
return web.Response(content_type="application/javascript", text=content)
async def mjpeg_handler(request):
"""This async function it's responsible
to create the MJPEG streaming.
Notes
-----
endpoint : /video/mjpeg
"""
my_boundary = "image-boundary"
response = web.StreamResponse(
status=200,
reason="OK",
headers={
"Content-Type": "multipart/x-mixed-replace;boundary={}".format(my_boundary)
},
)
await response.prepare(request)
image_buffer_manager = request.app["image_buffer_manager"]
while True:
jpeg_bytes = await image_buffer_manager.async_get_jpeg()
with MultipartWriter("image/jpeg", boundary=my_boundary) as mpwriter:
mpwriter.append(jpeg_bytes, {"Content-Type": "image/jpeg"})
try:
await mpwriter.write(response, close_boundary=False)
except ConnectionResetError:
logging.info("Client connection closed")
break
await response.write(b"\r\n")
async def offer(request, **kwargs):
video = kwargs["video"]
if "broadcast" in kwargs and kwargs["broadcast"]:
video = MediaRelay().subscribe(video)
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("connectionstatechange")
async def on_connectionstatechange():
print("Connection state is %s" % pc.connectionState)
if pc.connectionState == "failed":
await pc.close()
pcs.discard(pc)
# open media source
audio = None
await pc.setRemoteDescription(offer)
for t in pc.getTransceivers():
if t.kind == "audio" and audio:
pc.addTrack(audio)
elif t.kind == "video" and video:
pc.addTrack(video)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
),
)
[docs]
def set_weel(data, circular_queue):
deltaY = float(data["deltaY"])
user_envent_ms = float(data["timestampInMs"])
ok = circular_queue.enqueue(
np.array(
[EVENT_IDs.mouse_weel, deltaY, 0, 0, 0, 0, user_envent_ms, 0],
dtype="float64",
)
)
ts = time.time() * 1000
logging.info(f"WEEL Time until enqueue {ts-user_envent_ms:.2f} ms")
return ok
[docs]
def set_mouse(data, circular_queue):
x = float(data["x"])
y = float(data["y"])
ctrl_key = int(data["ctrlKey"])
shift_key = int(data["shiftKey"])
user_envent_ms = float(data["timestampInMs"])
circular_queue = circular_queue
ok = circular_queue.enqueue(
np.array(
[EVENT_IDs.mouse_move, 0, x, y, ctrl_key, shift_key, user_envent_ms, 0],
dtype="float64",
)
)
return ok
[docs]
def set_mouse_click(data, circular_queue):
"""3 | LeftButtonPressEvent
4 | LeftButtonReleaseEvent
5 | MiddleButtonPressEvent
6 | MiddleButtonReleaseEvent
7 | RightButtonPressEvent
8 | RightButtonReleaseEvent
"""
on = 0 if data["on"] == 1 else 1
ctrl = int(data["ctrlKey"])
shift = int(data["shiftKey"])
user_envent_ms = float(data["timestampInMs"])
x = float(data["x"])
y = float(data["y"])
mouse_button = int(data["mouseButton"])
if mouse_button not in [0, 1, 2]:
return False
if ctrl not in [0, 1] or shift not in [0, 1]:
return False
event_id = (mouse_button + 1) * 2 + on + 1
ok = circular_queue.enqueue(
np.array([event_id, 0, x, y, ctrl, shift, user_envent_ms, 0], dtype="float64")
)
return ok
async def on_shutdown(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
for ws in set(app["websockets"]):
await ws.close(code=WSCloseCode.GOING_AWAY, message="Server shutdown")
async def websocket_handler(request, **kwargs):
circular_queue = kwargs["circular_queue"]
ws = web.WebSocketResponse()
await ws.prepare(request)
request.app["websockets"].add(ws)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == "close":
await ws.close()
else:
data = json.loads(msg.data)
logging.info(f'\nuser event time {data["timestampInMs"]}')
if data["type"] == "weel":
ts = time.time() * 1000
interval = ts - data["timestampInMs"]
logging.info("WEEL request time approx " + f"{interval:.2f} ms")
set_weel(data, circular_queue)
elif data["type"] == "mouseMove":
set_mouse(data, circular_queue)
elif data["type"] == "mouseLeftClick":
set_mouse_click(data, circular_queue)
# await ws.send_str(msg.data + '/answer')
elif msg.type == aiohttp.WSMsgType.ERROR:
print("ws connection closed with exception {}".format(ws.exception()))
finally:
request.app["websockets"].discard(ws)
return ws
[docs]
@warn_on_args_to_kwargs()
def get_app(
*,
rtc_server=None,
folder=None,
circular_queue=None,
image_buffer_manager=None,
provides_mjpeg=False,
broadcast=True,
):
if folder is None:
folder = f"{os.path.dirname(__file__)}/www/"
app = web.Application()
app["websockets"] = weakref.WeakSet()
app.on_shutdown.append(on_shutdown)
# app.router.add_get("/shutdown",
# partial(
# shutdown,
# app=app,
# rtc_server=rtc_server,
# )
# )
app.router.add_get(
"/", partial(index, folder=folder, just_mjpeg=rtc_server is None)
)
js_files = [
"main.js",
"main_just_mjpeg.js",
"webrtc.js",
"constants.js",
"interaction.js",
]
for js in js_files:
app.router.add_get("/js/%s" % js, partial(javascript, folder=folder, js=js))
app["image_buffer_manager"] = image_buffer_manager
if provides_mjpeg:
app.router.add_get("/video/mjpeg", mjpeg_handler)
if rtc_server is not None:
app.router.add_post(
"/offer", partial(offer, video=rtc_server, broadcast=broadcast)
)
if circular_queue is not None:
app.add_routes(
[web.get("/ws", partial(websocket_handler, circular_queue=circular_queue))]
)
return app