Source code for fury.stream.widget
import errno
import socket
import subprocess
import sys
import time
import numpy as np
try:
from IPython.display import IFrame, display
IPYTHON_AVAILABLE = True
except ImportError:
IPYTHON_AVAILABLE = False
from fury.stream.client import FuryStreamClient, FuryStreamInteraction
from fury.stream.constants import PY_VERSION_8
[docs]
def check_port_is_available(host, port):
"""Check if a given port it's available
Parameters
----------
host : str
port : int
Returns
-------
available : bool
"""
available = True
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind((host, port))
except socket.error as error:
if error.errno == errno.EADDRINUSE:
available = False
s.close()
return available
[docs]
class Widget:
"""This Obj it's able execute the fury streaming system
using the SharedMemory object from Python multiprocessing.
"""
[docs]
def __init__(
self,
showm,
ms_stream=33,
ms_interaction=33,
host='localhost',
port=None,
encoding='mjpeg',
ms_jpeg=33,
queue_size=20,
):
"""Initialize the widget.
Parameters
----------
showm : ShowmManager
ms_stream : float, optional
time in mileseconds between each frame buffer update.
ms_interaction : float, optional
time in mileseconds between each user interaction update.
host : str, optional
port : int, optional
encoding : str, optional
If should use MJPEG streaming or WebRTC.
ms_jpeg : float, optional
This it's used only if the MJPEG will be used. The
ms_jpeg represents the amount of milliseconds between to
consecutive calls of the jpeg encoding.
queue_size : int, optional
maximum number of user interactions to be stored
"""
if not PY_VERSION_8:
raise ImportError(
'Python 3.8 or greater is required to use the\
widget class'
)
self.showm = showm
self.window_size = self.showm.size
max_window_size = (
int(self.window_size[0] * (1 + 0.1)),
int(self.window_size[1] * (1 + 0.1)),
)
self.max_window_size = max_window_size
self.ms_stream = ms_stream
self.ms_interaction = ms_interaction
self.ms_jpeg = ms_jpeg
self._host = host
if port is None:
port = np.random.randint(7000, 8888)
self._port = port
self.queue_size = queue_size
self._server_started = False
self.pserver = None
self.encoding = encoding
self.showm.window.SetOffScreenRendering(1)
self.showm.iren.EnableRenderOff()
@property
def command_string(self):
"""Return the command string to start the server
Returns
-------
command_string : str
"""
s = 'from fury.stream.server import web_server;'
s += 'web_server(image_buffer_names='
s += f'{self.stream.img_manager.image_buffer_names}'
s += f",info_buffer_name='{self.stream.img_manager.info_buffer_name}',"
s += "queue_head_tail_buffer_name='"
s += f"{self.stream_interaction.circular_queue.head_tail_buffer_name}'"
s += ",queue_buffer_name='"
s += f"{self.stream_interaction.circular_queue.buffer.buffer_name}'"
if self.encoding == 'mjpeg':
s += ',provides_mjpeg=True'
s += f',ms_jpeg={self.ms_jpeg}'
s += ',provides_webrtc=False'
s += f",port={self._port},host='{self._host}',"
s += 'avoid_unlink_shared_mem=True'
s += ')'
return s
def _start_fury_client(self, use_asyncio=False):
"""Start the fury image buffer client and the interaction client
Parameters
----------
use_asyncio : bool, optional
If should use asyncio to start the server.
Default is False.
"""
if self._server_started:
self.stop()
self.stream = FuryStreamClient(
self.showm,
max_window_size=self.max_window_size,
use_raw_array=False,
whithout_iren_start=True,
)
self.stream_interaction = FuryStreamInteraction(
self.showm,
max_queue_size=self.queue_size,
whithout_iren_start=True,
use_raw_array=False,
)
self.stream_interaction.start(ms=self.ms_interaction, use_asyncio=use_asyncio)
self.stream.start(self.ms_stream, use_asyncio=use_asyncio)
self._server_started = True
self.pserver = None
[docs]
def run_command(self):
"""Evaluate the command string to start the server"""
if self.pserver is not None:
self._kill_server()
i = 0
available = check_port_is_available(self._host, self._port)
while not available and i < 50:
self._port = np.random.randint(7000, 8888)
available = check_port_is_available(self._host, self._port)
i += 1
if not available:
return False
if self._server_started:
args = [sys.executable, '-c', self.command_string]
self.pserver = subprocess.Popen(
args,
# f'python -c "{self.command_string}"',
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
)
return True
@property
def url(self):
"""Return the url to access the server"""
url = f'http://{self._host}:{self._port}'
url += f'?iframe=1&encoding={self.encoding}'
return url
[docs]
def return_iframe(self, height=200):
"""Return the jupyter div iframe used to show the stream"""
if IPYTHON_AVAILABLE:
display(IFrame(self.url, '100%', f'{int(height)}px'))
[docs]
def start(self, use_asyncio=False):
"""Start the fury client and the interaction client and return the url
Parameters
----------
use_asyncio : bool, optional
If should use the asyncio version of the server.
Default is False.
"""
self._start_fury_client(use_asyncio)
ok = self.run_command()
if not ok:
self.stop()
return False
print(f'url: {self.url}')
[docs]
def display(self, height=150):
"""Start the server and display the url in an iframe"""
self._start_fury_client()
ok = self.run_command()
if not ok:
self.stop()
return False
time.sleep(2)
self.return_iframe(height)
[docs]
def stop(self):
"""Stop the streaming server and release the shared memory"""
if self._server_started:
self.stream.stop()
self.stream_interaction.stop()
if self.pserver is not None:
self._kill_server()
self.cleanup()
self._server_started = False
def _kill_server(self):
"""Kill the server process"""
self.pserver.kill()
self.pserver.wait()
self.pserver = None
[docs]
def cleanup(self):
"""Release the shared memory"""
if self.stream is not None:
self.stream.cleanup()
if self.stream_interaction is not None:
self.stream_interaction.cleanup()
def __del__(self):
self.stop()