Source code for fury.stream.widget

import errno
import socket
import subprocess
import sys
import time

import numpy as np

try:
    from IPython.display import IFrame, display

    IPYTHON_AVAILABLE = True
except ImportError:
    IPYTHON_AVAILABLE = False

from fury.decorators import warn_on_args_to_kwargs
from fury.stream.client import FuryStreamClient, FuryStreamInteraction
from fury.stream.constants import PY_VERSION_8


[docs] def check_port_is_available(host, port): """Check if a given port it's available Parameters ---------- host : str port : int Returns ------- available : bool """ available = True s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.bind((host, port)) except socket.error as error: if error.errno == errno.EADDRINUSE: available = False s.close() return available
[docs] class Widget: """This Obj it's able execute the fury streaming system using the SharedMemory object from Python multiprocessing. """ @warn_on_args_to_kwargs() def __init__( self, showm, *, ms_stream=33, ms_interaction=33, host="localhost", port=None, encoding="mjpeg", ms_jpeg=33, queue_size=20, ): """Initialize the widget. Parameters ---------- showm : ShowmManager ms_stream : float, optional time in mileseconds between each frame buffer update. ms_interaction : float, optional time in mileseconds between each user interaction update. host : str, optional port : int, optional encoding : str, optional If should use MJPEG streaming or WebRTC. ms_jpeg : float, optional This it's used only if the MJPEG will be used. The ms_jpeg represents the amount of milliseconds between to consecutive calls of the jpeg encoding. queue_size : int, optional maximum number of user interactions to be stored """ if not PY_VERSION_8: raise ImportError( "Python 3.8 or greater is required to use the\ widget class" ) self.showm = showm self.window_size = self.showm.size max_window_size = ( int(self.window_size[0] * (1 + 0.1)), int(self.window_size[1] * (1 + 0.1)), ) self.max_window_size = max_window_size self.ms_stream = ms_stream self.ms_interaction = ms_interaction self.ms_jpeg = ms_jpeg self._host = host if port is None: port = np.random.randint(7000, 8888) self._port = port self.queue_size = queue_size self._server_started = False self.pserver = None self.encoding = encoding self.showm.window.SetOffScreenRendering(1) self.showm.iren.EnableRenderOff() @property def command_string(self): """Return the command string to start the server Returns ------- command_string : str """ s = "from fury.stream.server import web_server;" s += "web_server(image_buffer_names=" s += f"{self.stream.img_manager.image_buffer_names}" s += f",info_buffer_name='{self.stream.img_manager.info_buffer_name}'," s += "queue_head_tail_buffer_name='" s += f"{self.stream_interaction.circular_queue.head_tail_buffer_name}'" s += ",queue_buffer_name='" s += f"{self.stream_interaction.circular_queue.buffer.buffer_name}'" if self.encoding == "mjpeg": s += ",provides_mjpeg=True" s += f",ms_jpeg={self.ms_jpeg}" s += ",provides_webrtc=False" s += f",port={self._port},host='{self._host}'," s += "avoid_unlink_shared_mem=True" s += ")" return s @warn_on_args_to_kwargs() def _start_fury_client(self, *, use_asyncio=False): """Start the fury image buffer client and the interaction client Parameters ---------- use_asyncio : bool, optional If should use asyncio to start the server. Default is False. """ if self._server_started: self.stop() self.stream = FuryStreamClient( self.showm, max_window_size=self.max_window_size, use_raw_array=False, whithout_iren_start=True, ) self.stream_interaction = FuryStreamInteraction( self.showm, max_queue_size=self.queue_size, whithout_iren_start=True, use_raw_array=False, ) self.stream_interaction.start(ms=self.ms_interaction, use_asyncio=use_asyncio) self.stream.start(self.ms_stream, use_asyncio=use_asyncio) self._server_started = True self.pserver = None
[docs] def run_command(self): """Evaluate the command string to start the server""" if self.pserver is not None: self._kill_server() i = 0 available = check_port_is_available(self._host, self._port) while not available and i < 50: self._port = np.random.randint(7000, 8888) available = check_port_is_available(self._host, self._port) i += 1 if not available: return False if self._server_started: args = [sys.executable, "-c", self.command_string] self.pserver = subprocess.Popen( args, # f'python -c "{self.command_string}"', stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False, ) return True
@property def url(self): """Return the url to access the server""" url = f"http://{self._host}:{self._port}" url += f"?iframe=1&encoding={self.encoding}" return url
[docs] @warn_on_args_to_kwargs() def return_iframe(self, *, height=200): """Return the jupyter div iframe used to show the stream""" if IPYTHON_AVAILABLE: display(IFrame(self.url, "100%", f"{int(height)}px"))
[docs] @warn_on_args_to_kwargs() def start(self, *, use_asyncio=False): """Start the fury client and the interaction client and return the url Parameters ---------- use_asyncio : bool, optional If should use the asyncio version of the server. Default is False. """ self._start_fury_client(use_asyncio) ok = self.run_command() if not ok: self.stop() return False print(f"url: {self.url}")
[docs] @warn_on_args_to_kwargs() def display(self, *, height=150): """Start the server and display the url in an iframe""" self._start_fury_client() ok = self.run_command() if not ok: self.stop() return False time.sleep(2) self.return_iframe(height)
[docs] def stop(self): """Stop the streaming server and release the shared memory""" if self._server_started: self.stream.stop() self.stream_interaction.stop() if self.pserver is not None: self._kill_server() self.cleanup() self._server_started = False
def _kill_server(self): """Kill the server process""" self.pserver.kill() self.pserver.wait() self.pserver = None
[docs] def cleanup(self): """Release the shared memory""" if self.stream is not None: self.stream.cleanup() if self.stream_interaction is not None: self.stream_interaction.cleanup()
def __del__(self): self.stop()