Source code for fury.stream.tools

from abc import ABC, abstractmethod
import asyncio
import io
import logging
import multiprocessing
from threading import Timer
import time

from PIL import Image, ImageDraw
import numpy as np

from fury.decorators import warn_on_args_to_kwargs
from fury.stream.constants import PY_VERSION_8

if PY_VERSION_8:
    from multiprocessing import resource_tracker, shared_memory
else:
    shared_memory = None  # type: ignore


_FLOAT_ShM_TYPE = "d"
_INT_ShM_TYPE = "i"
_UINT_ShM_TYPE = "I"
_BYTE_ShM_TYPE = "B"

_FLOAT_SIZE = np.dtype(_FLOAT_ShM_TYPE).itemsize
_INT_SIZE = np.dtype(_INT_ShM_TYPE).itemsize
_UINT_SIZE = np.dtype(_UINT_ShM_TYPE).itemsize
_BYTE_SIZE = np.dtype(_BYTE_ShM_TYPE).itemsize


[docs] def remove_shm_from_resource_tracker(): """Monkey-patch multiprocessing.resource_tracker so SharedMemory won't be tracked Notes ----- More details at: https://bugs.python.org/issue38119 """ def fix_register(name, rtype): if rtype == "shared_memory": return try: return resource_tracker._resource_tracker.register(self, name, rtype) except NameError: return None resource_tracker.register = fix_register def fix_unregister(name, rtype): if rtype == "shared_memory": return try: return resource_tracker._resource_tracker.unregister(self, name, rtype) except NameError: return None resource_tracker.unregister = fix_unregister if "shared_memory" in resource_tracker._CLEANUP_FUNCS: del resource_tracker._CLEANUP_FUNCS["shared_memory"]
[docs] class GenericMultiDimensionalBuffer(ABC): """This implements a abstract (generic) multidimensional buffer.""" @warn_on_args_to_kwargs() def __init__(self, *, max_size=None, dimension=8): """Initialize the multidimensional buffer. Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory dimension : int, default 8 """ self.max_size = max_size self.dimension = dimension self.buffer_name = None self._buffer = None self._buffer_repr = None self._created = False @property def buffer(self): return self._buffer @buffer.setter def buffer(self, data): if isinstance(data, (np.ndarray, np.generic)): if data.dtype == _FLOAT_ShM_TYPE: self._buffer_repr[:] = data
[docs] def get_start_end(self, idx): dim = self.dimension start = idx * dim end = dim * (idx + 1) return start, end
def __getitem__(self, idx): start, end = self.get_start_end(idx) logging.info(f"dequeue start {int(time.time()*1000)}") ts = time.time() * 1000 items = self._buffer_repr[start:end] te = time.time() * 1000 logging.info(f"dequeue frombuffer cost {te-ts:.2f}") return items def __setitem__(self, idx, data): start, end = self.get_start_end(idx) if isinstance(data, (np.ndarray, np.generic)): if data.dtype == _FLOAT_ShM_TYPE: # if end - start == self.dimension and start >= 0 and end >= 0: self._buffer_repr[start:end] = data
[docs] @abstractmethod def load_mem_resource(self): ... # pragma: no cover
[docs] @abstractmethod def create_mem_resource(self): ... # pragma: no cover
[docs] @abstractmethod def cleanup(self): ... # pragma: no cover
[docs] class RawArrayMultiDimensionalBuffer(GenericMultiDimensionalBuffer): """This implements a multidimensional buffer with RawArray.""" @warn_on_args_to_kwargs() def __init__(self, max_size, *, dimension=4, buffer=None): """Stream system uses that to implement the CircularQueue with shared memory resources. Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory dimension : int, default 8 buffer : buffer, optional If buffer is not passed to __init__ then the multidimensional buffer obj will create a new RawArray object to store the data If buffer is passed than this Obj will read a a already created RawArray """ super().__init__(max_size=max_size, dimension=dimension) if buffer is None: self.create_mem_resource() else: self._buffer = buffer self.load_mem_resource()
[docs] def create_mem_resource(self): buffer_arr = np.zeros( self.dimension * (self.max_size + 1), dtype=_FLOAT_ShM_TYPE ) buffer = multiprocessing.RawArray( _FLOAT_ShM_TYPE, np.ctypeslib.as_ctypes(buffer_arr) ) self._buffer = buffer self._buffer_repr = np.ctypeslib.as_array(self._buffer)
[docs] def load_mem_resource(self): self.max_size = int(len(self._buffer) // self.dimension) self.max_size -= 1 self._buffer_repr = np.ctypeslib.as_array(self._buffer)
[docs] def cleanup(self): pass
[docs] class SharedMemMultiDimensionalBuffer(GenericMultiDimensionalBuffer): """This implements a generic multidimensional buffer with SharedMemory. """ @warn_on_args_to_kwargs() def __init__(self, max_size, *, dimension=4, buffer_name=None): """Stream system uses that to implement the CircularQueue with shared memory resources. Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory dimension : int, default 8 buffer_name : str, optional if buffer_name is passed than this Obj will read a a already created SharedMemory """ super().__init__(max_size=max_size, dimension=dimension) if buffer_name is None: self.create_mem_resource() self._created = True else: self.buffer_name = buffer_name self.load_mem_resource() self._created = False self._create_repr()
[docs] def create_mem_resource(self): self._num_el = self.dimension * (self.max_size + 1) buffer_arr = np.zeros(self._num_el + 2, dtype=_FLOAT_ShM_TYPE) self._buffer = shared_memory.SharedMemory(create=True, size=buffer_arr.nbytes) sizes = np.ndarray( 2, dtype=_FLOAT_ShM_TYPE, buffer=self._buffer.buf[0 : _FLOAT_SIZE * 2] ) sizes[0] = self.max_size sizes[1] = self.dimension self.buffer_name = self._buffer.name logging.info( [ "create repr multidimensional buffer ", ] )
[docs] def load_mem_resource(self): self._buffer = shared_memory.SharedMemory(self.buffer_name) sizes = np.ndarray(2, dtype="d", buffer=self._buffer.buf[0 : _FLOAT_SIZE * 2]) self.max_size = int(sizes[0]) self.dimension = int(sizes[1]) num_el = int((sizes[0] + 1) * sizes[1]) self._num_el = num_el logging.info( [ "load repr multidimensional buffer", ] )
def _create_repr(self): start = _FLOAT_SIZE * 2 end = (self._num_el + 2) * _FLOAT_SIZE self._buffer_repr = np.ndarray( self._num_el, dtype=_FLOAT_ShM_TYPE, buffer=self._buffer.buf[start:end] ) logging.info( [ "create repr multidimensional buffer", self._buffer_repr.shape, "max size", self.max_size, "dimension", self.dimension, ] )
[docs] def cleanup(self): self._buffer.close() if self._created: # this it's due the python core issues # https://bugs.python.org/issue38119 # https://bugs.python.org/issue39959 # https://github.com/luizalabs/shared-memory-dict/issues/13 try: self._buffer.unlink() except FileNotFoundError: print( f"Shared Memory {self.buffer_name}(queue_event_buffer)\ File not found" )
[docs] class GenericCircularQueue(ABC): """This implements a generic circular queue which works with shared memory resources. """ @warn_on_args_to_kwargs() def __init__( self, *, max_size=None, dimension=8, use_shared_mem=False, buffer=None, buffer_name=None, ): """Initialize the circular queue. Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory. This will be used to construct the multidimensional buffer dimension : int, default 8 This will be used to construct the multidimensional buffer use_shared_mem : bool, default False If the multidimensional memory resource should create or read using SharedMemory or RawArrays buffer : RawArray, optional buffer_name: str, optional """ self._created = False self.head_tail_buffer_name = None self.head_tail_buffer_repr = None self.head_tail_buffer = None self._use_shared_mem = use_shared_mem if use_shared_mem: self.buffer = SharedMemMultiDimensionalBuffer( max_size=max_size, dimension=dimension, buffer_name=buffer_name ) else: self.buffer = RawArrayMultiDimensionalBuffer( max_size=max_size, dimension=dimension, buffer=buffer ) @property def head(self): if self._use_shared_mem: return self.head_tail_buffer_repr[0] else: return np.frombuffer(self.head_tail_buffer.get_obj(), _INT_ShM_TYPE)[0] @head.setter def head(self, value): self.head_tail_buffer_repr[0] = value @property def tail(self): if self._use_shared_mem: return self.head_tail_buffer_repr[1] else: return np.frombuffer(self.head_tail_buffer.get_obj(), _INT_ShM_TYPE)[1] @tail.setter def tail(self, value): self.head_tail_buffer_repr[1] = value
[docs] def set_head_tail(self, head, tail, lock=1): self.head_tail_buffer_repr[0:3] = np.array([head, tail, lock]).astype( _INT_ShM_TYPE )
def _enqueue(self, data): ok = False if (self.tail + 1) % self.buffer.max_size == self.head: ok = False else: if self.head == -1: self.set_head_tail(0, 0, 1) else: self.tail = (self.tail + 1) % self.buffer.max_size self.buffer[self.tail] = data ok = True return ok def _dequeue(self): if self.head == -1: interactions = None else: if self.head != self.tail: interactions = self.buffer[self.head] self.head = (self.head + 1) % self.buffer.max_size else: interactions = self.buffer[self.head] self.set_head_tail(-1, -1, 1) return interactions
[docs] @abstractmethod def enqueue(self, data): pass # pragma: no cover
[docs] @abstractmethod def dequeue(self): pass # pragma: no cover
[docs] @abstractmethod def load_mem_resource(self): pass # pragma: no cover
[docs] @abstractmethod def create_mem_resource(self): pass # pragma: no cover
[docs] @abstractmethod def cleanup(self): pass # pragma: no cover
[docs] class ArrayCircularQueue(GenericCircularQueue): """This implements a MultiDimensional Queue which works with Arrays and RawArrays. """ @warn_on_args_to_kwargs() def __init__(self, *, max_size=10, dimension=6, head_tail_buffer=None, buffer=None): """Stream system uses that to implement user interactions Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory. This will be used to construct the multidimensional buffer dimension : int, default 8 This will be used to construct the multidimensional buffer head_tail_buffer : buffer, optional If buffer is not passed to __init__ then this obj will create a new RawArray to store head and tail position. buffer : buffer, optional If buffer is not passed to __init__ then the multidimensional buffer obj will create a new RawArray to store the data """ super().__init__( max_size=max_size, dimension=dimension, use_shared_mem=False, buffer=buffer, ) if head_tail_buffer is None: self.create_mem_resource() self._created = True else: self.head_tail_buffer = head_tail_buffer self._created = False self.head_tail_buffer_name = None self.head_tail_buffer_repr = self.head_tail_buffer if self._created: self.set_head_tail(-1, -1, 0)
[docs] def load_mem_resource(self): pass # pragma: no cover
[docs] def create_mem_resource(self): # head_tail_arr[0] int; head position # head_tail_arr[1] int; tail position head_tail_arr = np.array([-1, -1, 0], dtype=_INT_ShM_TYPE) self.head_tail_buffer = multiprocessing.Array( _INT_ShM_TYPE, head_tail_arr, )
[docs] def enqueue(self, data): ok = False with self.head_tail_buffer.get_lock(): ok = self._enqueue(data) return ok
[docs] def dequeue(self): with self.head_tail_buffer.get_lock(): interactions = self._dequeue() return interactions
[docs] def cleanup(self): pass
[docs] class SharedMemCircularQueue(GenericCircularQueue): """This implements a MultiDimensional Queue which works with SharedMemory. """ @warn_on_args_to_kwargs() def __init__( self, *, max_size=10, dimension=6, head_tail_buffer_name=None, buffer_name=None ): """Stream system uses that to implement user interactions Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory. This will be used to construct the multidimensional buffer dimension : int, default 8 This will be used to construct the multidimensional buffer head_tail_buffer_name : str, optional if buffer_name is passed than this Obj will read a a already created SharedMemory with the head and tail information buffer_name : str, optional if buffer_name is passed than this Obj will read a a already created SharedMemory to create the MultiDimensionalBuffer """ super().__init__( max_size=max_size, dimension=dimension, use_shared_mem=True, buffer_name=buffer_name, ) if head_tail_buffer_name is None: self.create_mem_resource() self._created = True else: self.head_tail_buffer_name = head_tail_buffer_name self.load_mem_resource() self._created = False self.head_tail_buffer_repr = np.ndarray( 3, dtype=_INT_ShM_TYPE, buffer=self.head_tail_buffer.buf[0 : 3 * _INT_SIZE] ) logging.info( [ "create shared mem", "size repr", self.head_tail_buffer_repr.shape, "size buffer", self.head_tail_buffer.size / _INT_SIZE, ] ) if self._created: self.set_head_tail(-1, -1, 0)
[docs] def load_mem_resource(self): self.head_tail_buffer = shared_memory.SharedMemory(self.head_tail_buffer_name)
[docs] def create_mem_resource(self): # head_tail_arr[0] int; head position # head_tail_arr[1] int; tail position head_tail_arr = np.array([-1, -1, 0], dtype=_INT_ShM_TYPE) self.head_tail_buffer = shared_memory.SharedMemory( create=True, size=head_tail_arr.nbytes ) self.head_tail_buffer_name = self.head_tail_buffer.name
[docs] def is_unlocked(self): return self.head_tail_buffer_repr[2] == 0
[docs] def lock(self): self.head_tail_buffer_repr[2] = 1
[docs] def unlock(self): self.head_tail_buffer_repr[2] = 0
[docs] def enqueue(self, data): ok = False if self.is_unlocked(): self.lock() ok = self._enqueue(data) self.unlock() return ok
[docs] def dequeue(self): interactions = None if self.is_unlocked(): self.lock() interactions = self._dequeue() self.unlock() return interactions
[docs] def cleanup(self): self.buffer.cleanup() self.head_tail_buffer.close() if self._created: # this it's due the python core issues # https://bugs.python.org/issue38119 # https://bugs.python.org/issue39959 # https://github.com/luizalabs/shared-memory-dict/issues/13 try: self.head_tail_buffer.unlink() except FileNotFoundError: print( f"Shared Memory {self.head_tail_buffer_name}(head_tail)\ File not found" )
[docs] class GenericImageBufferManager(ABC): """This implements a abstract (generic) ImageBufferManager with the n-buffer technique. """ @warn_on_args_to_kwargs() def __init__(self, *, max_window_size=None, num_buffers=2, use_shared_mem=False): """Initialize the ImageBufferManager. Parameters ---------- max_window_size : tuple of ints, optional This allows resize events inside of the FURY window instance. Should be greater than the window size. num_buffers : int, optional Number of buffers to be used in the n-buffering technique. use_shared_mem: bool, default False """ self.max_window_size = np.array(max_window_size) self.num_buffers = num_buffers self.info_buffer_size = num_buffers * 2 + 2 self._use_shared_mem = use_shared_mem self.max_size = None # int self.num_components = 3 self.image_reprs = [] self.image_buffers = [] self.image_buffer_names = [] self.info_buffer_name = None self.info_buffer = None self.info_buffer_repr = None self._created = False size = (self.max_window_size[0], self.max_window_size[1]) img = Image.new("RGB", size, color=(0, 0, 0)) d = ImageDraw.Draw(img) pos_text = (12, size[1] // 2) d.text( pos_text, "Image size have exceed the Buffer Max Size", fill=(255, 255, 0) ) img = np.flipud(img) self.img_exceed = np.asarray(img).flatten() @property def next_buffer_index(self): index = int((self.info_buffer_repr[1] + 1) % self.num_buffers) return index @property def buffer_index(self): index = self.info_buffer_repr[1] return index
[docs] def write_into(self, w, h, np_arr): buffer_size = buffer_size = int(h * w * 3) next_buffer_index = self.next_buffer_index if buffer_size == self.max_size: self.image_reprs[next_buffer_index][:] = np_arr elif buffer_size < self.max_size: self.image_reprs[next_buffer_index][0:buffer_size] = np_arr else: self.image_reprs[next_buffer_index][0 : self.max_size] = self.img_exceed w = self.max_window_size[0] h = self.max_window_size[1] self.info_buffer_repr[2 + next_buffer_index * 2] = w self.info_buffer_repr[2 + next_buffer_index * 2 + 1] = h self.info_buffer_repr[1] = next_buffer_index
[docs] def get_current_frame(self): """Get the current frame from the buffer.""" if not self._use_shared_mem: image_info = np.frombuffer(self.info_buffer, _UINT_ShM_TYPE) else: image_info = self.info_buffer_repr buffer_index = int(image_info[1]) self.width = int(image_info[2 + buffer_index * 2]) self.height = int(image_info[2 + buffer_index * 2 + 1]) image = self.image_reprs[buffer_index] self.image_buffer_repr = image return self.width, self.height, image
[docs] def get_jpeg(self): """Returns a jpeg image from the buffer. Returns ------- bytes: jpeg image. """ width, height, image = self.get_current_frame() if self._use_shared_mem: image = np.frombuffer(image, _BYTE_ShM_TYPE) image = image[0 : width * height * 3].reshape((height, width, 3)) image = np.flipud(image) image_encoded = Image.fromarray(image, mode="RGB") bytes_img_data = io.BytesIO() image_encoded.save(bytes_img_data, format="jpeg") bytes_img = bytes_img_data.getvalue() return bytes_img
[docs] @warn_on_args_to_kwargs() async def async_get_jpeg(self, *, ms=33): jpeg = self.get_jpeg() await asyncio.sleep(ms / 1000) return jpeg
[docs] @abstractmethod def load_mem_resource(self): pass # pragma: no cover
[docs] @abstractmethod def create_mem_resource(self): pass # pragma: no cover
[docs] @abstractmethod def cleanup(self): pass # pragma: no cover
[docs] class RawArrayImageBufferManager(GenericImageBufferManager): """This implements an ImageBufferManager using RawArrays.""" @warn_on_args_to_kwargs() def __init__( self, *, max_window_size=(100, 100), num_buffers=2, image_buffers=None, info_buffer=None, ): """Initialize the ImageBufferManager. Parameters ---------- max_window_size : tuple of ints, optional This allows resize events inside of the FURY window instance. Should be greater than the window size. num_buffers : int, optional Number of buffers to be used in the n-buffering technique. info_buffer : buffer, optional A buffer with the information about the current frame to be streamed and the respective sizes image_buffers : list of buffers, optional A list of buffers with each one containing a frame. """ super().__init__( max_window_size=max_window_size, num_buffers=num_buffers, use_shared_mem=False, ) if image_buffers is None or info_buffer is None: self.create_mem_resource() else: self.image_buffers = image_buffers self.info_buffer = info_buffer self.load_mem_resource()
[docs] def create_mem_resource(self): self.max_size = self.max_window_size[0] * self.max_window_size[1] self.max_size *= self.num_components for _ in range(self.num_buffers): buffer = multiprocessing.RawArray( _BYTE_ShM_TYPE, np.ctypeslib.as_ctypes( np.random.randint(0, 255, size=self.max_size, dtype=_BYTE_ShM_TYPE) ), ) self.image_buffers.append(buffer) self.image_reprs.append(np.ctypeslib.as_array(buffer)) # info_list stores the information about the n frame buffers # as well the respectives sizes. # 0 number of components # 1 id buffer # 2, 3, width first buffer, height first buffer # 4, 5, width second buffer , height second buffer info_list = [3, 0] for _ in range(self.num_buffers): info_list += [self.max_window_size[0]] info_list += [self.max_window_size[1]] info_list = np.array(info_list, dtype=_UINT_ShM_TYPE) self.info_buffer = multiprocessing.RawArray( _UINT_ShM_TYPE, np.ctypeslib.as_ctypes(np.array(info_list)) ) self.info_buffer_repr = np.ctypeslib.as_array(self.info_buffer)
[docs] def load_mem_resource(self): self.info_buffer = np.frombuffer(self.info_buffer, _UINT_ShM_TYPE) self.info_buffer_repr = np.ctypeslib.as_array(self.info_buffer) for img_buffer in self.image_buffers: self.image_reprs.append(np.ctypeslib.as_array(img_buffer))
[docs] def cleanup(self): pass
[docs] class SharedMemImageBufferManager(GenericImageBufferManager): """This implements an ImageBufferManager using the SharedMemory approach. """ @warn_on_args_to_kwargs() def __init__( self, *, max_window_size=(100, 100), num_buffers=2, image_buffer_names=None, info_buffer_name=None, ): """Initialize the ImageBufferManager. Parameters ---------- max_window_size : tuple of ints, optional This allows resize events inside of the FURY window instance. Should be greater than the window size. num_buffers : int, optional Number of buffers to be used in the n-buffering technique. info_buffer_name : str The name of a buffer with the information about the current frame to be streamed and the respective sizes image_buffer_names : list of str, optional a list of buffer names. Each buffer contains a frame Notes ----- Python >=3.8 is a requirement to use this object. """ super().__init__( max_window_size=max_window_size, num_buffers=num_buffers, use_shared_mem=True, ) if image_buffer_names is None or info_buffer_name is None: self.create_mem_resource() self._created = True else: self.image_buffer_names = image_buffer_names self.info_buffer_name = info_buffer_name self._created = False self.load_mem_resource()
[docs] def create_mem_resource(self): self.max_size = self.max_window_size[0] * self.max_window_size[1] self.max_size *= self.num_components self.max_size = int(self.max_size) for _ in range(self.num_buffers): buffer = shared_memory.SharedMemory(create=True, size=self.max_size) self.image_buffers.append(buffer) self.image_reprs.append( np.ndarray(self.max_size, dtype=_BYTE_ShM_TYPE, buffer=buffer.buf) ) self.image_buffer_names.append(buffer.name) info_list = [2 + self.num_buffers * 2, 1, 3, 0] for _ in range(self.num_buffers): info_list += [self.max_window_size[0]] info_list += [self.max_window_size[1]] info_list = np.array(info_list, dtype=_UINT_ShM_TYPE) self.info_buffer = shared_memory.SharedMemory( create=True, size=info_list.nbytes ) sizes = np.ndarray( 2, dtype=_UINT_ShM_TYPE, buffer=self.info_buffer.buf[0 : _UINT_SIZE * 2] ) sizes[0] = info_list[0] sizes[1] = 1 self.info_buffer_repr = np.ndarray( sizes[0], dtype=_UINT_ShM_TYPE, buffer=self.info_buffer.buf[2 * _UINT_SIZE :], ) logging.info( [ "info buffer create", "buffer size", sizes[0], "repr size", self.info_buffer_repr.shape, ] ) self.info_buffer_name = self.info_buffer.name
[docs] def load_mem_resource(self): self.info_buffer = shared_memory.SharedMemory(self.info_buffer_name) sizes = np.ndarray( 2, dtype=_UINT_ShM_TYPE, buffer=self.info_buffer.buf[0 : _UINT_SIZE * 2] ) self.info_buffer_repr = np.ndarray( sizes[0], dtype=_UINT_ShM_TYPE, buffer=self.info_buffer.buf[2 * _UINT_SIZE :], ) logging.info( [ "info buffer load", "buffer size", sizes[0], "repr size", self.info_buffer_repr.shape, ] ) for buffer_name in self.image_buffer_names: buffer = shared_memory.SharedMemory(buffer_name) self.image_buffers.append(buffer) self.image_reprs.append( np.ndarray( buffer.size // _BYTE_SIZE, dtype=_BYTE_ShM_TYPE, buffer=buffer.buf ) )
[docs] def cleanup(self): """Release the resources used by the Shared Memory Manager""" self.info_buffer.close() # this it's due the python core issues # https://bugs.python.org/issue38119 # https://bugs.python.org/issue39959 # https://github.com/luizalabs/shared-memory-dict/issues/13 if self._created: try: self.info_buffer.unlink() except FileNotFoundError: print( f"Shared Memory {self.info_buffer_name}\ (info_buffer) File not found" ) for buffer, name in zip(self.image_buffers, self.image_buffer_names): buffer.close() if self._created: try: buffer.unlink() except FileNotFoundError: print(f"Shared Memory {name}(buffer image) File not found")
[docs] class IntervalTimerThreading: """Implements a object with the same behavior of setInterval from Js""" def __init__(self, seconds, callback, *args, **kwargs): """ Parameters ---------- seconds : float A positive float number. Represents the total amount of seconds between each call callback : function The function to be called *args : args args to be passed to callback **kwargs : kwargs kwargs to be passed to callback Examples -------- .. code-block:: python def callback(arr): arr += [len(arr)] arr = [] interval_timer = tools.IntervalTimer(1, callback, arr) interval_timer.start() time.sleep(5) interval_timer.stop() # len(arr) == 5 References ----------- [1] https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds """ # noqa self._timer = None self.seconds = seconds self.callback = callback self.args = args self.kwargs = kwargs self.is_running = False self.start() def _run(self): self.is_running = False self.start() self.callback(*self.args, **self.kwargs)
[docs] def start(self): """Start the timer""" if self.is_running: return self._timer = Timer(self.seconds, self._run) self._timer.daemon = True self._timer.start() self.is_running = True
[docs] def stop(self): """Stop the timer""" if self._timer is None: return self._timer.cancel() if self._timer.is_alive(): self._timer.join() self.is_running = False self._timer = None
[docs] class IntervalTimer: """A object that creates a timer that calls a function periodically.""" def __init__(self, seconds, callback, *args, **kwargs): """Parameters ---------- seconds : float A positive float number. Represents the total amount of seconds between each call callback : function The function to be called *args : args args to be passed to callback **kwargs : kwargs kwargs to be passed to callback """ self._seconds = seconds self._callback = callback self.args = args self.kwargs = kwargs self._is_running = False self.start() async def _run(self): self._is_running = True while True: await asyncio.sleep(self._seconds) if self._is_running: self._callback(*self.args, **self.kwargs)
[docs] def start(self): """Start the timer""" if self._is_running: return self._loop = asyncio.get_event_loop() self._task = self._loop.create_task(self._run())
[docs] def stop(self): """Stop the timer""" self._task.cancel() self._is_running = False