Source code for fury.stream.widget
import errno
import socket
import subprocess
import sys
import time
import numpy as np
try:
from IPython.display import IFrame, display
IPYTHON_AVAILABLE = True
except ImportError:
IPYTHON_AVAILABLE = False
from fury.decorators import warn_on_args_to_kwargs
from fury.stream.client import FuryStreamClient, FuryStreamInteraction
from fury.stream.constants import PY_VERSION_8
[docs]
def check_port_is_available(host, port):
"""Check if a given port it's available
Parameters
----------
host : str
port : int
Returns
-------
available : bool
"""
available = True
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind((host, port))
except socket.error as error:
if error.errno == errno.EADDRINUSE:
available = False
s.close()
return available
[docs]
class Widget:
"""This Obj it's able execute the fury streaming system
using the SharedMemory object from Python multiprocessing.
"""
@warn_on_args_to_kwargs()
def __init__(
self,
showm,
*,
ms_stream=33,
ms_interaction=33,
host="localhost",
port=None,
encoding="mjpeg",
ms_jpeg=33,
queue_size=20,
):
"""Initialize the widget.
Parameters
----------
showm : ShowmManager
ms_stream : float, optional
time in mileseconds between each frame buffer update.
ms_interaction : float, optional
time in mileseconds between each user interaction update.
host : str, optional
port : int, optional
encoding : str, optional
If should use MJPEG streaming or WebRTC.
ms_jpeg : float, optional
This it's used only if the MJPEG will be used. The
ms_jpeg represents the amount of milliseconds between to
consecutive calls of the jpeg encoding.
queue_size : int, optional
maximum number of user interactions to be stored
"""
if not PY_VERSION_8:
raise ImportError(
"Python 3.8 or greater is required to use the\
widget class"
)
self.showm = showm
self.window_size = self.showm.size
max_window_size = (
int(self.window_size[0] * (1 + 0.1)),
int(self.window_size[1] * (1 + 0.1)),
)
self.max_window_size = max_window_size
self.ms_stream = ms_stream
self.ms_interaction = ms_interaction
self.ms_jpeg = ms_jpeg
self._host = host
if port is None:
port = np.random.randint(7000, 8888)
self._port = port
self.queue_size = queue_size
self._server_started = False
self.pserver = None
self.encoding = encoding
self.showm.window.SetOffScreenRendering(1)
self.showm.iren.EnableRenderOff()
@property
def command_string(self):
"""Return the command string to start the server
Returns
-------
command_string : str
"""
s = "from fury.stream.server import web_server;"
s += "web_server(image_buffer_names="
s += f"{self.stream.img_manager.image_buffer_names}"
s += f",info_buffer_name='{self.stream.img_manager.info_buffer_name}',"
s += "queue_head_tail_buffer_name='"
s += f"{self.stream_interaction.circular_queue.head_tail_buffer_name}'"
s += ",queue_buffer_name='"
s += f"{self.stream_interaction.circular_queue.buffer.buffer_name}'"
if self.encoding == "mjpeg":
s += ",provides_mjpeg=True"
s += f",ms_jpeg={self.ms_jpeg}"
s += ",provides_webrtc=False"
s += f",port={self._port},host='{self._host}',"
s += "avoid_unlink_shared_mem=True"
s += ")"
return s
@warn_on_args_to_kwargs()
def _start_fury_client(self, *, use_asyncio=False):
"""Start the fury image buffer client and the interaction client
Parameters
----------
use_asyncio : bool, optional
If should use asyncio to start the server.
Default is False.
"""
if self._server_started:
self.stop()
self.stream = FuryStreamClient(
self.showm,
max_window_size=self.max_window_size,
use_raw_array=False,
whithout_iren_start=True,
)
self.stream_interaction = FuryStreamInteraction(
self.showm,
max_queue_size=self.queue_size,
whithout_iren_start=True,
use_raw_array=False,
)
self.stream_interaction.start(ms=self.ms_interaction, use_asyncio=use_asyncio)
self.stream.start(self.ms_stream, use_asyncio=use_asyncio)
self._server_started = True
self.pserver = None
[docs]
def run_command(self):
"""Evaluate the command string to start the server"""
if self.pserver is not None:
self._kill_server()
i = 0
available = check_port_is_available(self._host, self._port)
while not available and i < 50:
self._port = np.random.randint(7000, 8888)
available = check_port_is_available(self._host, self._port)
i += 1
if not available:
return False
if self._server_started:
args = [sys.executable, "-c", self.command_string]
self.pserver = subprocess.Popen(
args,
# f'python -c "{self.command_string}"',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
)
return True
@property
def url(self):
"""Return the url to access the server"""
url = f"http://{self._host}:{self._port}"
url += f"?iframe=1&encoding={self.encoding}"
return url
[docs]
@warn_on_args_to_kwargs()
def return_iframe(self, *, height=200):
"""Return the jupyter div iframe used to show the stream"""
if IPYTHON_AVAILABLE:
display(IFrame(self.url, "100%", f"{int(height)}px"))
[docs]
@warn_on_args_to_kwargs()
def start(self, *, use_asyncio=False):
"""Start the fury client and the interaction client and return the url
Parameters
----------
use_asyncio : bool, optional
If should use the asyncio version of the server.
Default is False.
"""
self._start_fury_client(use_asyncio)
ok = self.run_command()
if not ok:
self.stop()
return False
print(f"url: {self.url}")
[docs]
@warn_on_args_to_kwargs()
def display(self, *, height=150):
"""Start the server and display the url in an iframe"""
self._start_fury_client()
ok = self.run_command()
if not ok:
self.stop()
return False
time.sleep(2)
self.return_iframe(height)
[docs]
def stop(self):
"""Stop the streaming server and release the shared memory"""
if self._server_started:
self.stream.stop()
self.stream_interaction.stop()
if self.pserver is not None:
self._kill_server()
self.cleanup()
self._server_started = False
def _kill_server(self):
"""Kill the server process"""
self.pserver.kill()
self.pserver.wait()
self.pserver = None
[docs]
def cleanup(self):
"""Release the shared memory"""
if self.stream is not None:
self.stream.cleanup()
if self.stream_interaction is not None:
self.stream_interaction.cleanup()
def __del__(self):
self.stop()