from abc import ABC, abstractmethod
import asyncio
import io
import logging
import multiprocessing
from threading import Timer
import time
from PIL import Image, ImageDraw
import numpy as np
from fury.decorators import warn_on_args_to_kwargs
from fury.stream.constants import PY_VERSION_8
if PY_VERSION_8:
    from multiprocessing import resource_tracker, shared_memory
else:
    shared_memory = None  # type: ignore
_FLOAT_ShM_TYPE = "d"
_INT_ShM_TYPE = "i"
_UINT_ShM_TYPE = "I"
_BYTE_ShM_TYPE = "B"
_FLOAT_SIZE = np.dtype(_FLOAT_ShM_TYPE).itemsize
_INT_SIZE = np.dtype(_INT_ShM_TYPE).itemsize
_UINT_SIZE = np.dtype(_UINT_ShM_TYPE).itemsize
_BYTE_SIZE = np.dtype(_BYTE_ShM_TYPE).itemsize
[docs]
def remove_shm_from_resource_tracker():
    """Monkey-patch multiprocessing.resource_tracker so SharedMemory won't
    be tracked
    Notes
    -----
        More details at: https://bugs.python.org/issue38119
    """
    def fix_register(name, rtype):
        if rtype == "shared_memory":
            return
        try:
            return resource_tracker._resource_tracker.register(self, name, rtype)
        except NameError:
            return None
    resource_tracker.register = fix_register
    def fix_unregister(name, rtype):
        if rtype == "shared_memory":
            return
        try:
            return resource_tracker._resource_tracker.unregister(self, name, rtype)
        except NameError:
            return None
    resource_tracker.unregister = fix_unregister
    if "shared_memory" in resource_tracker._CLEANUP_FUNCS:
        del resource_tracker._CLEANUP_FUNCS["shared_memory"] 
[docs]
class GenericMultiDimensionalBuffer(ABC):
    """This implements a abstract (generic) multidimensional buffer."""
    @warn_on_args_to_kwargs()
    def __init__(self, *, max_size=None, dimension=8):
        """Initialize the multidimensional buffer.
        Parameters
        ----------
        max_size : int, optional
            If buffer_name or buffer was not passed then max_size
            it's mandatory
        dimension : int, default 8
        """
        self.max_size = max_size
        self.dimension = dimension
        self.buffer_name = None
        self._buffer = None
        self._buffer_repr = None
        self._created = False
    @property
    def buffer(self):
        return self._buffer
    @buffer.setter
    def buffer(self, data):
        if isinstance(data, (np.ndarray, np.generic)):
            if data.dtype == _FLOAT_ShM_TYPE:
                self._buffer_repr[:] = data
[docs]
    def get_start_end(self, idx):
        dim = self.dimension
        start = idx * dim
        end = dim * (idx + 1)
        return start, end 
    def __getitem__(self, idx):
        start, end = self.get_start_end(idx)
        logging.info(f"dequeue start {int(time.time()*1000)}")
        ts = time.time() * 1000
        items = self._buffer_repr[start:end]
        te = time.time() * 1000
        logging.info(f"dequeue frombuffer cost {te-ts:.2f}")
        return items
    def __setitem__(self, idx, data):
        start, end = self.get_start_end(idx)
        if isinstance(data, (np.ndarray, np.generic)):
            if data.dtype == _FLOAT_ShM_TYPE:
                # if end - start == self.dimension and start >= 0 and end >= 0:
                self._buffer_repr[start:end] = data
[docs]
    @abstractmethod
    def load_mem_resource(self): ...  # pragma: no cover 
[docs]
    @abstractmethod
    def create_mem_resource(self): ...  # pragma: no cover 
[docs]
    @abstractmethod
    def cleanup(self): ...  # pragma: no cover 
 
[docs]
class RawArrayMultiDimensionalBuffer(GenericMultiDimensionalBuffer):
    """This implements a  multidimensional buffer with RawArray."""
    @warn_on_args_to_kwargs()
    def __init__(self, max_size, *, dimension=4, buffer=None):
        """Stream system uses that to implement the CircularQueue
        with shared memory resources.
        Parameters
        ----------
        max_size : int, optional
            If buffer_name or buffer was not passed then max_size
            it's mandatory
        dimension : int, default 8
        buffer : buffer, optional
            If buffer is not passed to __init__
            then the multidimensional buffer obj will create a new
            RawArray object to store the data
            If buffer is passed than this Obj will read a
            a already created RawArray
        """
        super().__init__(max_size=max_size, dimension=dimension)
        if buffer is None:
            self.create_mem_resource()
        else:
            self._buffer = buffer
            self.load_mem_resource()
[docs]
    def create_mem_resource(self):
        buffer_arr = np.zeros(
            self.dimension * (self.max_size + 1), dtype=_FLOAT_ShM_TYPE
        )
        buffer = multiprocessing.RawArray(
            _FLOAT_ShM_TYPE, np.ctypeslib.as_ctypes(buffer_arr)
        )
        self._buffer = buffer
        self._buffer_repr = np.ctypeslib.as_array(self._buffer) 
[docs]
    def load_mem_resource(self):
        self.max_size = int(len(self._buffer) // self.dimension)
        self.max_size -= 1
        self._buffer_repr = np.ctypeslib.as_array(self._buffer) 
[docs]
    def cleanup(self):
        pass 
 
[docs]
class SharedMemMultiDimensionalBuffer(GenericMultiDimensionalBuffer):
    """This implements a generic multidimensional buffer
    with SharedMemory.
    """
    @warn_on_args_to_kwargs()
    def __init__(self, max_size, *, dimension=4, buffer_name=None):
        """Stream system uses that to implement the
        CircularQueue with shared memory resources.
        Parameters
        ----------
        max_size : int, optional
            If buffer_name or buffer was not passed then max_size
            it's mandatory
        dimension : int, default 8
        buffer_name : str, optional
            if buffer_name is passed than this Obj will read a
            a already created SharedMemory
        """
        super().__init__(max_size=max_size, dimension=dimension)
        if buffer_name is None:
            self.create_mem_resource()
            self._created = True
        else:
            self.buffer_name = buffer_name
            self.load_mem_resource()
            self._created = False
        self._create_repr()
[docs]
    def create_mem_resource(self):
        self._num_el = self.dimension * (self.max_size + 1)
        buffer_arr = np.zeros(self._num_el + 2, dtype=_FLOAT_ShM_TYPE)
        self._buffer = shared_memory.SharedMemory(create=True, size=buffer_arr.nbytes)
        sizes = np.ndarray(
            2, dtype=_FLOAT_ShM_TYPE, buffer=self._buffer.buf[0 : _FLOAT_SIZE * 2]
        )
        sizes[0] = self.max_size
        sizes[1] = self.dimension
        self.buffer_name = self._buffer.name
        logging.info(
            [
                "create repr multidimensional buffer ",
            ]
        ) 
[docs]
    def load_mem_resource(self):
        self._buffer = shared_memory.SharedMemory(self.buffer_name)
        sizes = np.ndarray(2, dtype="d", buffer=self._buffer.buf[0 : _FLOAT_SIZE * 2])
        self.max_size = int(sizes[0])
        self.dimension = int(sizes[1])
        num_el = int((sizes[0] + 1) * sizes[1])
        self._num_el = num_el
        logging.info(
            [
                "load repr multidimensional buffer",
            ]
        ) 
    def _create_repr(self):
        start = _FLOAT_SIZE * 2
        end = (self._num_el + 2) * _FLOAT_SIZE
        self._buffer_repr = np.ndarray(
            self._num_el, dtype=_FLOAT_ShM_TYPE, buffer=self._buffer.buf[start:end]
        )
        logging.info(
            [
                "create repr multidimensional buffer",
                self._buffer_repr.shape,
                "max size",
                self.max_size,
                "dimension",
                self.dimension,
            ]
        )
[docs]
    def cleanup(self):
        self._buffer.close()
        if self._created:
            # this it's due the python core issues
            # https://bugs.python.org/issue38119
            # https://bugs.python.org/issue39959
            # https://github.com/luizalabs/shared-memory-dict/issues/13
            try:
                self._buffer.unlink()
            except FileNotFoundError:
                print(
                    f"Shared Memory {self.buffer_name}(queue_event_buffer)\
                    File not found"
                ) 
 
[docs]
class GenericCircularQueue(ABC):
    """This implements a generic circular queue which works with
    shared memory resources.
    """
    @warn_on_args_to_kwargs()
    def __init__(
        self,
        *,
        max_size=None,
        dimension=8,
        use_shared_mem=False,
        buffer=None,
        buffer_name=None,
    ):
        """Initialize the circular queue.
        Parameters
        ----------
        max_size : int, optional
            If buffer_name or buffer was not passed then max_size
            it's mandatory. This will be used to construct the
            multidimensional buffer
        dimension : int, default 8
            This will be used to construct the multidimensional buffer
        use_shared_mem : bool, default False
            If the multidimensional memory resource should create or read
            using SharedMemory or RawArrays
        buffer : RawArray, optional
        buffer_name: str, optional
        """
        self._created = False
        self.head_tail_buffer_name = None
        self.head_tail_buffer_repr = None
        self.head_tail_buffer = None
        self._use_shared_mem = use_shared_mem
        if use_shared_mem:
            self.buffer = SharedMemMultiDimensionalBuffer(
                max_size=max_size, dimension=dimension, buffer_name=buffer_name
            )
        else:
            self.buffer = RawArrayMultiDimensionalBuffer(
                max_size=max_size, dimension=dimension, buffer=buffer
            )
    @property
    def head(self):
        if self._use_shared_mem:
            return self.head_tail_buffer_repr[0]
        else:
            return np.frombuffer(self.head_tail_buffer.get_obj(), _INT_ShM_TYPE)[0]
    @head.setter
    def head(self, value):
        self.head_tail_buffer_repr[0] = value
    @property
    def tail(self):
        if self._use_shared_mem:
            return self.head_tail_buffer_repr[1]
        else:
            return np.frombuffer(self.head_tail_buffer.get_obj(), _INT_ShM_TYPE)[1]
    @tail.setter
    def tail(self, value):
        self.head_tail_buffer_repr[1] = value
[docs]
    def set_head_tail(self, head, tail, lock=1):
        self.head_tail_buffer_repr[0:3] = np.array([head, tail, lock]).astype(
            _INT_ShM_TYPE
        ) 
    def _enqueue(self, data):
        ok = False
        if (self.tail + 1) % self.buffer.max_size == self.head:
            ok = False
        else:
            if self.head == -1:
                self.set_head_tail(0, 0, 1)
            else:
                self.tail = (self.tail + 1) % self.buffer.max_size
            self.buffer[self.tail] = data
            ok = True
        return ok
    def _dequeue(self):
        if self.head == -1:
            interactions = None
        else:
            if self.head != self.tail:
                interactions = self.buffer[self.head]
                self.head = (self.head + 1) % self.buffer.max_size
            else:
                interactions = self.buffer[self.head]
                self.set_head_tail(-1, -1, 1)
        return interactions
[docs]
    @abstractmethod
    def enqueue(self, data):
        pass  # pragma: no cover 
[docs]
    @abstractmethod
    def dequeue(self):
        pass  # pragma: no cover 
[docs]
    @abstractmethod
    def load_mem_resource(self):
        pass  # pragma: no cover 
[docs]
    @abstractmethod
    def create_mem_resource(self):
        pass  # pragma: no cover 
[docs]
    @abstractmethod
    def cleanup(self):
        pass  # pragma: no cover 
 
[docs]
class ArrayCircularQueue(GenericCircularQueue):
    """This implements a MultiDimensional Queue which works with
    Arrays and RawArrays.
    """
    @warn_on_args_to_kwargs()
    def __init__(self, *, max_size=10, dimension=6, head_tail_buffer=None, buffer=None):
        """Stream system uses that to implement user interactions
        Parameters
        ----------
        max_size : int, optional
            If buffer_name or buffer was not passed then max_size
            it's mandatory. This will be used to construct the
            multidimensional buffer
        dimension : int, default 8
            This will be used to construct the multidimensional buffer
        head_tail_buffer : buffer, optional
            If buffer is not passed to __init__
            then this obj will create a new
            RawArray to store head and tail position.
        buffer : buffer, optional
            If buffer  is not passed to __init__
            then the multidimensional buffer obj will create a new
            RawArray to store the data
        """
        super().__init__(
            max_size=max_size,
            dimension=dimension,
            use_shared_mem=False,
            buffer=buffer,
        )
        if head_tail_buffer is None:
            self.create_mem_resource()
            self._created = True
        else:
            self.head_tail_buffer = head_tail_buffer
            self._created = False
        self.head_tail_buffer_name = None
        self.head_tail_buffer_repr = self.head_tail_buffer
        if self._created:
            self.set_head_tail(-1, -1, 0)
[docs]
    def load_mem_resource(self):
        pass  # pragma: no cover 
[docs]
    def create_mem_resource(self):
        # head_tail_arr[0] int; head position
        # head_tail_arr[1] int; tail position
        head_tail_arr = np.array([-1, -1, 0], dtype=_INT_ShM_TYPE)
        self.head_tail_buffer = multiprocessing.Array(
            _INT_ShM_TYPE,
            head_tail_arr,
        ) 
[docs]
    def enqueue(self, data):
        ok = False
        with self.head_tail_buffer.get_lock():
            ok = self._enqueue(data)
        return ok 
[docs]
    def dequeue(self):
        with self.head_tail_buffer.get_lock():
            interactions = self._dequeue()
        return interactions 
[docs]
    def cleanup(self):
        pass 
 
[docs]
class SharedMemCircularQueue(GenericCircularQueue):
    """This implements a MultiDimensional Queue which works with
    SharedMemory.
    """
    @warn_on_args_to_kwargs()
    def __init__(
        self, *, max_size=10, dimension=6, head_tail_buffer_name=None, buffer_name=None
    ):
        """Stream system uses that to implement user interactions
        Parameters
        ----------
        max_size : int, optional
            If buffer_name or buffer was not passed then max_size
            it's mandatory. This will be used to construct the
            multidimensional buffer
        dimension : int, default 8
            This will be used to construct the multidimensional buffer
        head_tail_buffer_name : str, optional
            if buffer_name is passed than this Obj will read a
            a already created SharedMemory with the head and tail
            information
        buffer_name : str, optional
            if buffer_name is passed than this Obj will read a
            a already created SharedMemory to create the MultiDimensionalBuffer
        """
        super().__init__(
            max_size=max_size,
            dimension=dimension,
            use_shared_mem=True,
            buffer_name=buffer_name,
        )
        if head_tail_buffer_name is None:
            self.create_mem_resource()
            self._created = True
        else:
            self.head_tail_buffer_name = head_tail_buffer_name
            self.load_mem_resource()
            self._created = False
        self.head_tail_buffer_repr = np.ndarray(
            3, dtype=_INT_ShM_TYPE, buffer=self.head_tail_buffer.buf[0 : 3 * _INT_SIZE]
        )
        logging.info(
            [
                "create shared mem",
                "size repr",
                self.head_tail_buffer_repr.shape,
                "size buffer",
                self.head_tail_buffer.size / _INT_SIZE,
            ]
        )
        if self._created:
            self.set_head_tail(-1, -1, 0)
[docs]
    def load_mem_resource(self):
        self.head_tail_buffer = shared_memory.SharedMemory(self.head_tail_buffer_name) 
[docs]
    def create_mem_resource(self):
        # head_tail_arr[0] int; head position
        # head_tail_arr[1] int; tail position
        head_tail_arr = np.array([-1, -1, 0], dtype=_INT_ShM_TYPE)
        self.head_tail_buffer = shared_memory.SharedMemory(
            create=True, size=head_tail_arr.nbytes
        )
        self.head_tail_buffer_name = self.head_tail_buffer.name 
[docs]
    def is_unlocked(self):
        return self.head_tail_buffer_repr[2] == 0 
[docs]
    def lock(self):
        self.head_tail_buffer_repr[2] = 1 
[docs]
    def unlock(self):
        self.head_tail_buffer_repr[2] = 0 
[docs]
    def enqueue(self, data):
        ok = False
        if self.is_unlocked():
            self.lock()
            ok = self._enqueue(data)
            self.unlock()
        return ok 
[docs]
    def dequeue(self):
        interactions = None
        if self.is_unlocked():
            self.lock()
            interactions = self._dequeue()
            self.unlock()
        return interactions 
[docs]
    def cleanup(self):
        self.buffer.cleanup()
        self.head_tail_buffer.close()
        if self._created:
            # this it's due the python core issues
            # https://bugs.python.org/issue38119
            # https://bugs.python.org/issue39959
            # https://github.com/luizalabs/shared-memory-dict/issues/13
            try:
                self.head_tail_buffer.unlink()
            except FileNotFoundError:
                print(
                    f"Shared Memory {self.head_tail_buffer_name}(head_tail)\
                     File not found"
                ) 
 
[docs]
class GenericImageBufferManager(ABC):
    """This implements a abstract (generic) ImageBufferManager with
    the n-buffer technique.
    """
    @warn_on_args_to_kwargs()
    def __init__(self, *, max_window_size=None, num_buffers=2, use_shared_mem=False):
        """Initialize the ImageBufferManager.
        Parameters
        ----------
        max_window_size : tuple of ints, optional
            This allows resize events inside of the FURY window instance.
            Should be greater than the window size.
        num_buffers : int, optional
            Number of buffers to be used in the n-buffering
            technique.
        use_shared_mem: bool, default False
        """
        self.max_window_size = np.array(max_window_size)
        self.num_buffers = num_buffers
        self.info_buffer_size = num_buffers * 2 + 2
        self._use_shared_mem = use_shared_mem
        self.max_size = None  # int
        self.num_components = 3
        self.image_reprs = []
        self.image_buffers = []
        self.image_buffer_names = []
        self.info_buffer_name = None
        self.info_buffer = None
        self.info_buffer_repr = None
        self._created = False
        size = (self.max_window_size[0], self.max_window_size[1])
        img = Image.new("RGB", size, color=(0, 0, 0))
        d = ImageDraw.Draw(img)
        pos_text = (12, size[1] // 2)
        d.text(
            pos_text, "Image size have exceed the Buffer Max Size", fill=(255, 255, 0)
        )
        img = np.flipud(img)
        self.img_exceed = np.asarray(img).flatten()
    @property
    def next_buffer_index(self):
        index = int((self.info_buffer_repr[1] + 1) % self.num_buffers)
        return index
    @property
    def buffer_index(self):
        index = self.info_buffer_repr[1]
        return index
[docs]
    def write_into(self, w, h, np_arr):
        buffer_size = buffer_size = int(h * w * 3)
        next_buffer_index = self.next_buffer_index
        if buffer_size == self.max_size:
            self.image_reprs[next_buffer_index][:] = np_arr
        elif buffer_size < self.max_size:
            self.image_reprs[next_buffer_index][0:buffer_size] = np_arr
        else:
            self.image_reprs[next_buffer_index][0 : self.max_size] = self.img_exceed
            w = self.max_window_size[0]
            h = self.max_window_size[1]
        self.info_buffer_repr[2 + next_buffer_index * 2] = w
        self.info_buffer_repr[2 + next_buffer_index * 2 + 1] = h
        self.info_buffer_repr[1] = next_buffer_index 
[docs]
    def get_current_frame(self):
        """Get the current frame from the buffer."""
        if not self._use_shared_mem:
            image_info = np.frombuffer(self.info_buffer, _UINT_ShM_TYPE)
        else:
            image_info = self.info_buffer_repr
        buffer_index = int(image_info[1])
        self.width = int(image_info[2 + buffer_index * 2])
        self.height = int(image_info[2 + buffer_index * 2 + 1])
        image = self.image_reprs[buffer_index]
        self.image_buffer_repr = image
        return self.width, self.height, image 
[docs]
    def get_jpeg(self):
        """Returns a jpeg image from the buffer.
        Returns
        -------
            bytes: jpeg image.
        """
        width, height, image = self.get_current_frame()
        if self._use_shared_mem:
            image = np.frombuffer(image, _BYTE_ShM_TYPE)
        image = image[0 : width * height * 3].reshape((height, width, 3))
        image = np.flipud(image)
        image_encoded = Image.fromarray(image, mode="RGB")
        bytes_img_data = io.BytesIO()
        image_encoded.save(bytes_img_data, format="jpeg")
        bytes_img = bytes_img_data.getvalue()
        return bytes_img 
[docs]
    @warn_on_args_to_kwargs()
    async def async_get_jpeg(self, *, ms=33):
        jpeg = self.get_jpeg()
        await asyncio.sleep(ms / 1000)
        return jpeg 
[docs]
    @abstractmethod
    def load_mem_resource(self):
        pass  # pragma: no cover 
[docs]
    @abstractmethod
    def create_mem_resource(self):
        pass  # pragma: no cover 
[docs]
    @abstractmethod
    def cleanup(self):
        pass  # pragma: no cover 
 
[docs]
class RawArrayImageBufferManager(GenericImageBufferManager):
    """This implements an ImageBufferManager using RawArrays."""
    @warn_on_args_to_kwargs()
    def __init__(
        self,
        *,
        max_window_size=(100, 100),
        num_buffers=2,
        image_buffers=None,
        info_buffer=None,
    ):
        """Initialize the ImageBufferManager.
        Parameters
        ----------
        max_window_size : tuple of ints, optional
                This allows resize events inside of the FURY window instance.
                Should be greater than the window size.
        num_buffers : int, optional
                Number of buffers to be used in the n-buffering
                technique.
        info_buffer : buffer, optional
            A buffer with the information about the current
            frame to be streamed and the respective sizes
        image_buffers : list of buffers, optional
            A list of buffers with each one containing a frame.
        """
        super().__init__(
            max_window_size=max_window_size,
            num_buffers=num_buffers,
            use_shared_mem=False,
        )
        if image_buffers is None or info_buffer is None:
            self.create_mem_resource()
        else:
            self.image_buffers = image_buffers
            self.info_buffer = info_buffer
            self.load_mem_resource()
[docs]
    def create_mem_resource(self):
        self.max_size = self.max_window_size[0] * self.max_window_size[1]
        self.max_size *= self.num_components
        for _ in range(self.num_buffers):
            buffer = multiprocessing.RawArray(
                _BYTE_ShM_TYPE,
                np.ctypeslib.as_ctypes(
                    np.random.randint(0, 255, size=self.max_size, dtype=_BYTE_ShM_TYPE)
                ),
            )
            self.image_buffers.append(buffer)
            self.image_reprs.append(np.ctypeslib.as_array(buffer))
        # info_list stores the information about the n frame buffers
        # as well the respectives sizes.
        # 0 number of components
        # 1 id buffer
        # 2, 3, width first buffer, height first buffer
        # 4, 5, width second buffer , height second buffer
        info_list = [3, 0]
        for _ in range(self.num_buffers):
            info_list += [self.max_window_size[0]]
            info_list += [self.max_window_size[1]]
        info_list = np.array(info_list, dtype=_UINT_ShM_TYPE)
        self.info_buffer = multiprocessing.RawArray(
            _UINT_ShM_TYPE, np.ctypeslib.as_ctypes(np.array(info_list))
        )
        self.info_buffer_repr = np.ctypeslib.as_array(self.info_buffer) 
[docs]
    def load_mem_resource(self):
        self.info_buffer = np.frombuffer(self.info_buffer, _UINT_ShM_TYPE)
        self.info_buffer_repr = np.ctypeslib.as_array(self.info_buffer)
        for img_buffer in self.image_buffers:
            self.image_reprs.append(np.ctypeslib.as_array(img_buffer)) 
[docs]
    def cleanup(self):
        pass 
 
[docs]
class SharedMemImageBufferManager(GenericImageBufferManager):
    """This implements an ImageBufferManager using the
    SharedMemory approach.
    """
    @warn_on_args_to_kwargs()
    def __init__(
        self,
        *,
        max_window_size=(100, 100),
        num_buffers=2,
        image_buffer_names=None,
        info_buffer_name=None,
    ):
        """Initialize the ImageBufferManager.
        Parameters
        ----------
        max_window_size : tuple of ints, optional
                This allows resize events inside of the FURY window instance.
                Should be greater than the window size.
        num_buffers : int, optional
                Number of buffers to be used in the n-buffering
                technique.
        info_buffer_name : str
            The name of a buffer with the information about the current
            frame to be streamed and the respective sizes
        image_buffer_names : list of str, optional
            a list of buffer names. Each buffer contains a frame
        Notes
        -----
        Python >=3.8 is a requirement to use this object.
        """
        super().__init__(
            max_window_size=max_window_size,
            num_buffers=num_buffers,
            use_shared_mem=True,
        )
        if image_buffer_names is None or info_buffer_name is None:
            self.create_mem_resource()
            self._created = True
        else:
            self.image_buffer_names = image_buffer_names
            self.info_buffer_name = info_buffer_name
            self._created = False
            self.load_mem_resource()
[docs]
    def create_mem_resource(self):
        self.max_size = self.max_window_size[0] * self.max_window_size[1]
        self.max_size *= self.num_components
        self.max_size = int(self.max_size)
        for _ in range(self.num_buffers):
            buffer = shared_memory.SharedMemory(create=True, size=self.max_size)
            self.image_buffers.append(buffer)
            self.image_reprs.append(
                np.ndarray(self.max_size, dtype=_BYTE_ShM_TYPE, buffer=buffer.buf)
            )
            self.image_buffer_names.append(buffer.name)
        info_list = [2 + self.num_buffers * 2, 1, 3, 0]
        for _ in range(self.num_buffers):
            info_list += [self.max_window_size[0]]
            info_list += [self.max_window_size[1]]
        info_list = np.array(info_list, dtype=_UINT_ShM_TYPE)
        self.info_buffer = shared_memory.SharedMemory(
            create=True, size=info_list.nbytes
        )
        sizes = np.ndarray(
            2, dtype=_UINT_ShM_TYPE, buffer=self.info_buffer.buf[0 : _UINT_SIZE * 2]
        )
        sizes[0] = info_list[0]
        sizes[1] = 1
        self.info_buffer_repr = np.ndarray(
            sizes[0],
            dtype=_UINT_ShM_TYPE,
            buffer=self.info_buffer.buf[2 * _UINT_SIZE :],
        )
        logging.info(
            [
                "info buffer create",
                "buffer size",
                sizes[0],
                "repr size",
                self.info_buffer_repr.shape,
            ]
        )
        self.info_buffer_name = self.info_buffer.name 
[docs]
    def load_mem_resource(self):
        self.info_buffer = shared_memory.SharedMemory(self.info_buffer_name)
        sizes = np.ndarray(
            2, dtype=_UINT_ShM_TYPE, buffer=self.info_buffer.buf[0 : _UINT_SIZE * 2]
        )
        self.info_buffer_repr = np.ndarray(
            sizes[0],
            dtype=_UINT_ShM_TYPE,
            buffer=self.info_buffer.buf[2 * _UINT_SIZE :],
        )
        logging.info(
            [
                "info buffer load",
                "buffer size",
                sizes[0],
                "repr size",
                self.info_buffer_repr.shape,
            ]
        )
        for buffer_name in self.image_buffer_names:
            buffer = shared_memory.SharedMemory(buffer_name)
            self.image_buffers.append(buffer)
            self.image_reprs.append(
                np.ndarray(
                    buffer.size // _BYTE_SIZE, dtype=_BYTE_ShM_TYPE, buffer=buffer.buf
                )
            ) 
[docs]
    def cleanup(self):
        """Release the resources used by the Shared Memory Manager"""
        self.info_buffer.close()
        # this it's due the python core issues
        # https://bugs.python.org/issue38119
        # https://bugs.python.org/issue39959
        # https://github.com/luizalabs/shared-memory-dict/issues/13
        if self._created:
            try:
                self.info_buffer.unlink()
            except FileNotFoundError:
                print(
                    f"Shared Memory {self.info_buffer_name}\
                        (info_buffer) File not found"
                )
        for buffer, name in zip(self.image_buffers, self.image_buffer_names):
            buffer.close()
            if self._created:
                try:
                    buffer.unlink()
                except FileNotFoundError:
                    print(f"Shared Memory {name}(buffer image) File not found") 
 
[docs]
class IntervalTimerThreading:
    """Implements a object with the same behavior of setInterval from Js"""
    def __init__(self, seconds, callback, *args, **kwargs):
        """
        Parameters
        ----------
        seconds : float
            A positive float number. Represents the total amount of
            seconds between each call
        callback : function
            The function to be called
        *args : args
            args to be passed to callback
        **kwargs : kwargs
            kwargs to be passed to callback
        Examples
        --------
        .. code-block:: python
            def callback(arr):
                arr += [len(arr)]
            arr = []
            interval_timer = tools.IntervalTimer(1, callback, arr)
            interval_timer.start()
            time.sleep(5)
            interval_timer.stop()
            # len(arr) == 5
        References
        -----------
        [1] https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds
        """  # noqa
        self._timer = None
        self.seconds = seconds
        self.callback = callback
        self.args = args
        self.kwargs = kwargs
        self.is_running = False
        self.start()
    def _run(self):
        self.is_running = False
        self.start()
        self.callback(*self.args, **self.kwargs)
[docs]
    def start(self):
        """Start the timer"""
        if self.is_running:
            return
        self._timer = Timer(self.seconds, self._run)
        self._timer.daemon = True
        self._timer.start()
        self.is_running = True 
[docs]
    def stop(self):
        """Stop the timer"""
        if self._timer is None:
            return
        self._timer.cancel()
        if self._timer.is_alive():
            self._timer.join()
        self.is_running = False
        self._timer = None 
 
[docs]
class IntervalTimer:
    """A object that creates a timer that calls a function periodically."""
    def __init__(self, seconds, callback, *args, **kwargs):
        """Parameters
        ----------
        seconds : float
            A positive float number. Represents the total amount of
            seconds between each call
        callback : function
            The function to be called
        *args : args
            args to be passed to callback
        **kwargs : kwargs
            kwargs to be passed to callback
        """
        self._seconds = seconds
        self._callback = callback
        self.args = args
        self.kwargs = kwargs
        self._is_running = False
        self.start()
    async def _run(self):
        self._is_running = True
        while True:
            await asyncio.sleep(self._seconds)
            if self._is_running:
                self._callback(*self.args, **self.kwargs)
[docs]
    def start(self):
        """Start the timer"""
        if self._is_running:
            return
        self._loop = asyncio.get_event_loop()
        self._task = self._loop.create_task(self._run()) 
[docs]
    def stop(self):
        """Stop the timer"""
        self._task.cancel()
        self._is_running = False