diff options
| author | morefigs <morefigs@gmail.com> | 2019-01-23 14:20:05 +1100 |
|---|---|---|
| committer | morefigs <morefigs@gmail.com> | 2019-01-23 14:20:05 +1100 |
| commit | 058cd5c653f09dc0308d259017c2a0b2195c8749 (patch) | |
| tree | 7609448d50f5d490d0fed8cb6a7c49f3f8a31507 | |
| parent | 8f851759ecfe7b874020cbe4297f32a80993296a (diff) | |
| download | pymba-058cd5c653f09dc0308d259017c2a0b2195c8749.tar.gz pymba-058cd5c653f09dc0308d259017c2a0b2195c8749.zip | |
clean up frame class
| -rw-r--r-- | pymba/vimba_frame.py | 162 |
1 files changed, 79 insertions, 83 deletions
diff --git a/pymba/vimba_frame.py b/pymba/vimba_frame.py index 2cd8492..0832429 100644 --- a/pymba/vimba_frame.py +++ b/pymba/vimba_frame.py @@ -1,14 +1,14 @@ -from __future__ import absolute_import -from . import vimba_structure as structs -from .vimba_exception import VimbaException -from .vimba_c import VimbaDLL -from .vimba_c import VimbaC_MemoryBlock -from ctypes import * +from ctypes import byref, sizeof, addressof, create_string_buffer, cast, POINTER, c_ubyte, c_void_p +from typing import Optional import warnings try: import numpy as np except ImportError: - warnings.warn('numpy not found, some VimbaFrame methods will not be available') + warnings.warn('could not import numpy, some Frame methods may not work.') + +from .vimba_camera import VimbaCamera +from .vimba_exception import VimbaException +from . import vimba_c # Map pixel formats to bytes per pixel. @@ -37,143 +37,139 @@ PIXEL_FORMATS = { } -class VimbaFrame(object): +class MemoryBlock: + """ + A memory block object for dealing neatly with C memory allocations. + """ + + @property + def block(self): + return c_void_p(addressof(self._block)) + + def __init__(self, block_size): + self._block = create_string_buffer(block_size) + + # this seems to be None if too much memory is requested + if self._block is None: + raise VimbaException(VimbaException.ERR_FRAME_BUFFER_MEMORY) + + def __del__(self): + del self._block + + +class Frame: """ A Vimba frame. """ - def __init__(self, camera): + + def __init__(self, camera: VimbaCamera): self._camera = camera self._handle = camera.handle # get frame sizes - self.payloadSize = self._camera.PayloadSize + self.payload_size = self._camera.PayloadSize self.width = self._camera.Width self.height = self._camera.Height self.pixel_bytes = PIXEL_FORMATS[self._camera.PixelFormat] # frame structure - self._frame = structs.VimbaFrame() + self._frame = vimba_c.VmbFrame() - def announceFrame(self): - """ - Announce frames to the API that may be queued for frame capturing later. - - Runs VmbFrameAnnounce + self._c_mem = None - Should be called after the frame is created. Call startCapture - after this method. + def announce(self): + """ + Announce frames to the API that may be queued for frame capturing later. Should be called after the frame is + created. Call startCapture after this method. """ - # size of expected frame - sizeOfFrame = self.payloadSize - # keep this reference to keep block alive for life of frame - self._c_mem = VimbaC_MemoryBlock(sizeOfFrame) + self._c_mem = MemoryBlock(self.payload_size) # set buffer to have length of expected payload size self._frame.buffer = self._c_mem.block # set buffer size to expected payload size - self._frame.bufferSize = sizeOfFrame + self._frame.bufferSize = self.payload_size - error = VimbaDLL.frameAnnounce(self._handle, + error = vimba_c.vmb_frame_announce(self._handle, byref(self._frame), sizeof(self._frame)) - if error: raise VimbaException(error) - def revokeFrame(self): + def revoke(self): """ Revoke a frame from the API. """ - error = VimbaDLL.frameRevoke(self._handle, + error = vimba_c.vmb_frame_revoke(self._handle, byref(self._frame)) - if error: raise VimbaException(error) - def queueFrameCapture(self, frameCallback = None): + def queue_capture(self, frame_callback: Optional[bool] = None) -> None: """ - Queue frames that may be filled during frame capturing. - Runs VmbCaptureFrameQueue - - Call after announceFrame and startCapture - - Callback must accept argument of type frame. Remember to requeue the - frame by calling frame.queueFrameCapture(frameCallback) at the end of - your callback function. + Queue frames that may be filled during frame capturing. Call after announceFrame and startCapture. Callback + must accept argument of type frame. Remember to requeue the frame by calling frame.queue_capture() at the end + of your callback function. """ - # remember the given callback function - self._frameCallback = frameCallback + self._frame_callback = frame_callback # define a callback wrapper here so it doesn't bind self - def frameCallbackWrapper(cam_handle, p_frame): + def frame_callback_wrapper(cam_handle, p_frame): # call the user's callback with the self bound outside the wrapper # ignore the frame pointer since we already know the callback # refers to this frame - self._frameCallback(self) + self._frame_callback(self) - if self._frameCallback is None: - self._frameCallbackWrapper_C = None + if self._frame_callback is None: + self._frame_callback_wrapper_c = None else: # keep a reference to prevent gc issues - self._frameCallbackWrapper_C = VimbaDLL.frameDoneCallback(frameCallbackWrapper) + self._frame_callback_wrapper_c = vimba_c.vmb_frame_callback(frame_callback_wrapper) - error = VimbaDLL.captureFrameQueue(self._handle, - byref(self._frame), - self._frameCallbackWrapper_C) + error = vimba_c.vmb_capture_frame_queue(self._handle, + byref(self._frame), + self._frame_callback_wrapper_c) if error: raise VimbaException(error) - def waitFrameCapture(self, timeout=2000): + def wait_capture(self, timeout_ms: Optional[int] = 2000) -> int: """ - Wait for a queued frame to be filled (or dequeued). Returns error - upon completion. - Runs VmbCaptureFrameWait - - timeout - int, milliseconds default(timeout, 2000) - - Call after an acquisition command + Wait for a queued frame to be filled (or dequeued). Call after an acquisition command. + :param timeout_ms: time out in milliseconds. """ - error = VimbaDLL.captureFrameWait(self._handle, - byref(self._frame), - timeout) + error = vimba_c.vmb_capture_frame_wait(self._handle, + byref(self._frame), + timeout_ms) # error to be processed by the end user for this function. - # Prevents system for breaking for example on a hardware trigger - # timeout - #if error: - #raise VimbaException(error) + # Prevents system for breaking for example on a hardware trigger timeout + + # todo raise error instead? + return error - # custom method for simplified usage - def getBufferByteData(self): + def get_buffer_data(self) -> c_ubyte * int: """ Retrieve buffer data in a useful format. - - :returns: array -- buffer data. """ - # cast frame buffer memory contents to a usable type - data = cast(self._frame.buffer, - POINTER(c_ubyte * self.payloadSize)) + data = cast(self._frame.buffer, POINTER(c_ubyte * self.payload_size)) # make array of c_ubytes from buffer - imagebytes = int(self.height * self.width * self.pixel_bytes) - array = (c_ubyte * imagebytes).from_address(addressof(data.contents)) + image_bytes = int(self.height * self.width * self.pixel_bytes) + return (c_ubyte * image_bytes).from_address(addressof(data.contents)) - return array - - def getImage(self): - cframe = self._frame - data = cast(cframe.buffer, POINTER(c_ubyte * cframe.imageSize)) - try: - return np.ndarray(buffer=data.contents, dtype=np.uint8, shape=(cframe.height, cframe.width)) - except NameError as e: - print('install numpy to use this method or use getBufferByteData instead') - raise e + def get_image(self) -> np.ndarray: + """ + Returns the frame's image data as a NumPy array. + """ + data = cast(self._frame.buffer, POINTER(c_ubyte * self._frame.imageSize)) + return np.ndarray(buffer=data.contents, dtype=np.uint8, shape=(self._frame.height, self._frame.width)) - def getTimestamp(self): + @property + def timestamp(self) -> int: return self._frame.timestamp - def getReceiveStatus(self): + @property + def receive_status(self) -> int: return self._frame.receiveStatus |
