[docs]defremove_shm_from_resource_tracker():"""Monkey-patch multiprocessing.resource_tracker so SharedMemory won't be tracked Notes ----- More details at: https://bugs.python.org/issue38119 """deffix_register(name,rtype):ifrtype=="shared_memory":returntry:returnresource_tracker._resource_tracker.register(self,name,rtype)exceptNameError:returnNoneresource_tracker.register=fix_registerdeffix_unregister(name,rtype):ifrtype=="shared_memory":returntry:returnresource_tracker._resource_tracker.unregister(self,name,rtype)exceptNameError:returnNoneresource_tracker.unregister=fix_unregisterif"shared_memory"inresource_tracker._CLEANUP_FUNCS:delresource_tracker._CLEANUP_FUNCS["shared_memory"]
[docs]classGenericMultiDimensionalBuffer(ABC):"""This implements a abstract (generic) multidimensional buffer."""@warn_on_args_to_kwargs()def__init__(self,*,max_size=None,dimension=8):"""Initialize the multidimensional buffer. Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory dimension : int, default 8 """self.max_size=max_sizeself.dimension=dimensionself.buffer_name=Noneself._buffer=Noneself._buffer_repr=Noneself._created=False@propertydefbuffer(self):returnself._buffer@buffer.setterdefbuffer(self,data):ifisinstance(data,(np.ndarray,np.generic)):ifdata.dtype==_FLOAT_ShM_TYPE:self._buffer_repr[:]=data
def__getitem__(self,idx):start,end=self.get_start_end(idx)logging.info(f"dequeue start {int(time.time()*1000)}")ts=time.time()*1000items=self._buffer_repr[start:end]te=time.time()*1000logging.info(f"dequeue frombuffer cost {te-ts:.2f}")returnitemsdef__setitem__(self,idx,data):start,end=self.get_start_end(idx)ifisinstance(data,(np.ndarray,np.generic)):ifdata.dtype==_FLOAT_ShM_TYPE:# if end - start == self.dimension and start >= 0 and end >= 0:self._buffer_repr[start:end]=data
[docs]@abstractmethoddefload_mem_resource(self):...# pragma: no cover
[docs]@abstractmethoddefcreate_mem_resource(self):...# pragma: no cover
[docs]@abstractmethoddefcleanup(self):...# pragma: no cover
[docs]classRawArrayMultiDimensionalBuffer(GenericMultiDimensionalBuffer):"""This implements a multidimensional buffer with RawArray."""@warn_on_args_to_kwargs()def__init__(self,max_size,*,dimension=4,buffer=None):"""Stream system uses that to implement the CircularQueue with shared memory resources. Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory dimension : int, default 8 buffer : buffer, optional If buffer is not passed to __init__ then the multidimensional buffer obj will create a new RawArray object to store the data If buffer is passed than this Obj will read a a already created RawArray """super().__init__(max_size=max_size,dimension=dimension)ifbufferisNone:self.create_mem_resource()else:self._buffer=bufferself.load_mem_resource()
[docs]classSharedMemMultiDimensionalBuffer(GenericMultiDimensionalBuffer):"""This implements a generic multidimensional buffer with SharedMemory. """@warn_on_args_to_kwargs()def__init__(self,max_size,*,dimension=4,buffer_name=None):"""Stream system uses that to implement the CircularQueue with shared memory resources. Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory dimension : int, default 8 buffer_name : str, optional if buffer_name is passed than this Obj will read a a already created SharedMemory """super().__init__(max_size=max_size,dimension=dimension)ifbuffer_nameisNone:self.create_mem_resource()self._created=Trueelse:self.buffer_name=buffer_nameself.load_mem_resource()self._created=Falseself._create_repr()
[docs]defcleanup(self):self._buffer.close()ifself._created:# this it's due the python core issues# https://bugs.python.org/issue38119# https://bugs.python.org/issue39959# https://github.com/luizalabs/shared-memory-dict/issues/13try:self._buffer.unlink()exceptFileNotFoundError:print(f"Shared Memory {self.buffer_name}(queue_event_buffer)\ File not found")
[docs]classGenericCircularQueue(ABC):"""This implements a generic circular queue which works with shared memory resources. """@warn_on_args_to_kwargs()def__init__(self,*,max_size=None,dimension=8,use_shared_mem=False,buffer=None,buffer_name=None,):"""Initialize the circular queue. Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory. This will be used to construct the multidimensional buffer dimension : int, default 8 This will be used to construct the multidimensional buffer use_shared_mem : bool, default False If the multidimensional memory resource should create or read using SharedMemory or RawArrays buffer : RawArray, optional buffer_name: str, optional """self._created=Falseself.head_tail_buffer_name=Noneself.head_tail_buffer_repr=Noneself.head_tail_buffer=Noneself._use_shared_mem=use_shared_memifuse_shared_mem:self.buffer=SharedMemMultiDimensionalBuffer(max_size=max_size,dimension=dimension,buffer_name=buffer_name)else:self.buffer=RawArrayMultiDimensionalBuffer(max_size=max_size,dimension=dimension,buffer=buffer)@propertydefhead(self):ifself._use_shared_mem:returnself.head_tail_buffer_repr[0]else:returnnp.frombuffer(self.head_tail_buffer.get_obj(),_INT_ShM_TYPE)[0]@head.setterdefhead(self,value):self.head_tail_buffer_repr[0]=value@propertydeftail(self):ifself._use_shared_mem:returnself.head_tail_buffer_repr[1]else:returnnp.frombuffer(self.head_tail_buffer.get_obj(),_INT_ShM_TYPE)[1]@tail.setterdeftail(self,value):self.head_tail_buffer_repr[1]=value
[docs]@abstractmethoddefenqueue(self,data):pass# pragma: no cover
[docs]@abstractmethoddefdequeue(self):pass# pragma: no cover
[docs]@abstractmethoddefload_mem_resource(self):pass# pragma: no cover
[docs]@abstractmethoddefcreate_mem_resource(self):pass# pragma: no cover
[docs]@abstractmethoddefcleanup(self):pass# pragma: no cover
[docs]classArrayCircularQueue(GenericCircularQueue):"""This implements a MultiDimensional Queue which works with Arrays and RawArrays. """@warn_on_args_to_kwargs()def__init__(self,*,max_size=10,dimension=6,head_tail_buffer=None,buffer=None):"""Stream system uses that to implement user interactions Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory. This will be used to construct the multidimensional buffer dimension : int, default 8 This will be used to construct the multidimensional buffer head_tail_buffer : buffer, optional If buffer is not passed to __init__ then this obj will create a new RawArray to store head and tail position. buffer : buffer, optional If buffer is not passed to __init__ then the multidimensional buffer obj will create a new RawArray to store the data """super().__init__(max_size=max_size,dimension=dimension,use_shared_mem=False,buffer=buffer,)ifhead_tail_bufferisNone:self.create_mem_resource()self._created=Trueelse:self.head_tail_buffer=head_tail_bufferself._created=Falseself.head_tail_buffer_name=Noneself.head_tail_buffer_repr=self.head_tail_bufferifself._created:self.set_head_tail(-1,-1,0)
[docs]defload_mem_resource(self):pass# pragma: no cover
[docs]defcreate_mem_resource(self):# head_tail_arr[0] int; head position# head_tail_arr[1] int; tail positionhead_tail_arr=np.array([-1,-1,0],dtype=_INT_ShM_TYPE)self.head_tail_buffer=multiprocessing.Array(_INT_ShM_TYPE,head_tail_arr,)
[docs]classSharedMemCircularQueue(GenericCircularQueue):"""This implements a MultiDimensional Queue which works with SharedMemory. """@warn_on_args_to_kwargs()def__init__(self,*,max_size=10,dimension=6,head_tail_buffer_name=None,buffer_name=None):"""Stream system uses that to implement user interactions Parameters ---------- max_size : int, optional If buffer_name or buffer was not passed then max_size it's mandatory. This will be used to construct the multidimensional buffer dimension : int, default 8 This will be used to construct the multidimensional buffer head_tail_buffer_name : str, optional if buffer_name is passed than this Obj will read a a already created SharedMemory with the head and tail information buffer_name : str, optional if buffer_name is passed than this Obj will read a a already created SharedMemory to create the MultiDimensionalBuffer """super().__init__(max_size=max_size,dimension=dimension,use_shared_mem=True,buffer_name=buffer_name,)ifhead_tail_buffer_nameisNone:self.create_mem_resource()self._created=Trueelse:self.head_tail_buffer_name=head_tail_buffer_nameself.load_mem_resource()self._created=Falseself.head_tail_buffer_repr=np.ndarray(3,dtype=_INT_ShM_TYPE,buffer=self.head_tail_buffer.buf[0:3*_INT_SIZE])logging.info(["create shared mem","size repr",self.head_tail_buffer_repr.shape,"size buffer",self.head_tail_buffer.size/_INT_SIZE,])ifself._created:self.set_head_tail(-1,-1,0)
[docs]defcleanup(self):self.buffer.cleanup()self.head_tail_buffer.close()ifself._created:# this it's due the python core issues# https://bugs.python.org/issue38119# https://bugs.python.org/issue39959# https://github.com/luizalabs/shared-memory-dict/issues/13try:self.head_tail_buffer.unlink()exceptFileNotFoundError:print(f"Shared Memory {self.head_tail_buffer_name}(head_tail)\ File not found")
[docs]classGenericImageBufferManager(ABC):"""This implements a abstract (generic) ImageBufferManager with the n-buffer technique. """@warn_on_args_to_kwargs()def__init__(self,*,max_window_size=None,num_buffers=2,use_shared_mem=False):"""Initialize the ImageBufferManager. Parameters ---------- max_window_size : tuple of ints, optional This allows resize events inside of the FURY window instance. Should be greater than the window size. num_buffers : int, optional Number of buffers to be used in the n-buffering technique. use_shared_mem: bool, default False """self.max_window_size=np.array(max_window_size)self.num_buffers=num_buffersself.info_buffer_size=num_buffers*2+2self._use_shared_mem=use_shared_memself.max_size=None# intself.num_components=3self.image_reprs=[]self.image_buffers=[]self.image_buffer_names=[]self.info_buffer_name=Noneself.info_buffer=Noneself.info_buffer_repr=Noneself._created=Falsesize=(self.max_window_size[0],self.max_window_size[1])img=Image.new("RGB",size,color=(0,0,0))d=ImageDraw.Draw(img)pos_text=(12,size[1]//2)d.text(pos_text,"Image size have exceed the Buffer Max Size",fill=(255,255,0))img=np.flipud(img)self.img_exceed=np.asarray(img).flatten()@propertydefnext_buffer_index(self):index=int((self.info_buffer_repr[1]+1)%self.num_buffers)returnindex@propertydefbuffer_index(self):index=self.info_buffer_repr[1]returnindex
[docs]defget_current_frame(self):"""Get the current frame from the buffer."""ifnotself._use_shared_mem:image_info=np.frombuffer(self.info_buffer,_UINT_ShM_TYPE)else:image_info=self.info_buffer_reprbuffer_index=int(image_info[1])self.width=int(image_info[2+buffer_index*2])self.height=int(image_info[2+buffer_index*2+1])image=self.image_reprs[buffer_index]self.image_buffer_repr=imagereturnself.width,self.height,image
[docs]defget_jpeg(self):"""Returns a jpeg image from the buffer. Returns ------- bytes: jpeg image. """width,height,image=self.get_current_frame()ifself._use_shared_mem:image=np.frombuffer(image,_BYTE_ShM_TYPE)image=image[0:width*height*3].reshape((height,width,3))image=np.flipud(image)image_encoded=Image.fromarray(image,mode="RGB")bytes_img_data=io.BytesIO()image_encoded.save(bytes_img_data,format="jpeg")bytes_img=bytes_img_data.getvalue()returnbytes_img
[docs]@abstractmethoddefload_mem_resource(self):pass# pragma: no cover
[docs]@abstractmethoddefcreate_mem_resource(self):pass# pragma: no cover
[docs]@abstractmethoddefcleanup(self):pass# pragma: no cover
[docs]classRawArrayImageBufferManager(GenericImageBufferManager):"""This implements an ImageBufferManager using RawArrays."""@warn_on_args_to_kwargs()def__init__(self,*,max_window_size=(100,100),num_buffers=2,image_buffers=None,info_buffer=None,):"""Initialize the ImageBufferManager. Parameters ---------- max_window_size : tuple of ints, optional This allows resize events inside of the FURY window instance. Should be greater than the window size. num_buffers : int, optional Number of buffers to be used in the n-buffering technique. info_buffer : buffer, optional A buffer with the information about the current frame to be streamed and the respective sizes image_buffers : list of buffers, optional A list of buffers with each one containing a frame. """super().__init__(max_window_size=max_window_size,num_buffers=num_buffers,use_shared_mem=False,)ifimage_buffersisNoneorinfo_bufferisNone:self.create_mem_resource()else:self.image_buffers=image_buffersself.info_buffer=info_bufferself.load_mem_resource()
[docs]defcreate_mem_resource(self):self.max_size=self.max_window_size[0]*self.max_window_size[1]self.max_size*=self.num_componentsfor_inrange(self.num_buffers):buffer=multiprocessing.RawArray(_BYTE_ShM_TYPE,np.ctypeslib.as_ctypes(np.random.randint(0,255,size=self.max_size,dtype=_BYTE_ShM_TYPE)),)self.image_buffers.append(buffer)self.image_reprs.append(np.ctypeslib.as_array(buffer))# info_list stores the information about the n frame buffers# as well the respectives sizes.# 0 number of components# 1 id buffer# 2, 3, width first buffer, height first buffer# 4, 5, width second buffer , height second bufferinfo_list=[3,0]for_inrange(self.num_buffers):info_list+=[self.max_window_size[0]]info_list+=[self.max_window_size[1]]info_list=np.array(info_list,dtype=_UINT_ShM_TYPE)self.info_buffer=multiprocessing.RawArray(_UINT_ShM_TYPE,np.ctypeslib.as_ctypes(np.array(info_list)))self.info_buffer_repr=np.ctypeslib.as_array(self.info_buffer)
[docs]classSharedMemImageBufferManager(GenericImageBufferManager):"""This implements an ImageBufferManager using the SharedMemory approach. """@warn_on_args_to_kwargs()def__init__(self,*,max_window_size=(100,100),num_buffers=2,image_buffer_names=None,info_buffer_name=None,):"""Initialize the ImageBufferManager. Parameters ---------- max_window_size : tuple of ints, optional This allows resize events inside of the FURY window instance. Should be greater than the window size. num_buffers : int, optional Number of buffers to be used in the n-buffering technique. info_buffer_name : str The name of a buffer with the information about the current frame to be streamed and the respective sizes image_buffer_names : list of str, optional a list of buffer names. Each buffer contains a frame Notes ----- Python >=3.8 is a requirement to use this object. """super().__init__(max_window_size=max_window_size,num_buffers=num_buffers,use_shared_mem=True,)ifimage_buffer_namesisNoneorinfo_buffer_nameisNone:self.create_mem_resource()self._created=Trueelse:self.image_buffer_names=image_buffer_namesself.info_buffer_name=info_buffer_nameself._created=Falseself.load_mem_resource()
[docs]defcleanup(self):"""Release the resources used by the Shared Memory Manager"""self.info_buffer.close()# this it's due the python core issues# https://bugs.python.org/issue38119# https://bugs.python.org/issue39959# https://github.com/luizalabs/shared-memory-dict/issues/13ifself._created:try:self.info_buffer.unlink()exceptFileNotFoundError:print(f"Shared Memory {self.info_buffer_name}\ (info_buffer) File not found")forbuffer,nameinzip(self.image_buffers,self.image_buffer_names):buffer.close()ifself._created:try:buffer.unlink()exceptFileNotFoundError:print(f"Shared Memory {name}(buffer image) File not found")
[docs]classIntervalTimerThreading:"""Implements a object with the same behavior of setInterval from Js"""def__init__(self,seconds,callback,*args,**kwargs):""" Parameters ---------- seconds : float A positive float number. Represents the total amount of seconds between each call callback : function The function to be called *args : args args to be passed to callback **kwargs : kwargs kwargs to be passed to callback Examples -------- .. code-block:: python def callback(arr): arr += [len(arr)] arr = [] interval_timer = tools.IntervalTimer(1, callback, arr) interval_timer.start() time.sleep(5) interval_timer.stop() # len(arr) == 5 References ----------- [1] https://stackoverflow.com/questions/3393612/run-certain-code-every-n-seconds """# noqaself._timer=Noneself.seconds=secondsself.callback=callbackself.args=argsself.kwargs=kwargsself.is_running=Falseself.start()def_run(self):self.is_running=Falseself.start()self.callback(*self.args,**self.kwargs)
[docs]defstart(self):"""Start the timer"""ifself.is_running:returnself._timer=Timer(self.seconds,self._run)self._timer.daemon=Trueself._timer.start()self.is_running=True
[docs]defstop(self):"""Stop the timer"""ifself._timerisNone:returnself._timer.cancel()ifself._timer.is_alive():self._timer.join()self.is_running=Falseself._timer=None
[docs]classIntervalTimer:"""A object that creates a timer that calls a function periodically."""def__init__(self,seconds,callback,*args,**kwargs):"""Parameters ---------- seconds : float A positive float number. Represents the total amount of seconds between each call callback : function The function to be called *args : args args to be passed to callback **kwargs : kwargs kwargs to be passed to callback """self._seconds=secondsself._callback=callbackself.args=argsself.kwargs=kwargsself._is_running=Falseself.start()asyncdef_run(self):self._is_running=TruewhileTrue:awaitasyncio.sleep(self._seconds)ifself._is_running:self._callback(*self.args,**self.kwargs)
[docs]defstart(self):"""Start the timer"""ifself._is_running:returnself._loop=asyncio.get_event_loop()self._task=self._loop.create_task(self._run())
[docs]defstop(self):"""Stop the timer"""self._task.cancel()self._is_running=False