Source code for xspress3

# -*- coding: utf-8 -*-
import time
import logging

from monitorpv import MonitorPV
from epics import caget, caput, PV

from . import hdf5
HDF5 = hdf5.HDF5


[docs]class Xspress3: """Xspress 3 Device A class encapsulating an Xspress 3 Device Example: >>> x3 = Xspres3(pv_prefix) # create an x3 object with pv_prefix >>> x3.set( >>> exposure_time=1, >>> num_images=10 >>> ) >>> x3.acquire() """ _sca_count = 7 _mcas = [] _scalars = [] _params = {} _num_acquired = None _acquired_iter = 0 _frame_callbacks = [] _parameters = { 'exposure_time': ['AcquireTime', 'AcquireTime_RBV', False, {}], 'num_images': ['NumImages', 'NumImages_RBV', False, {}], 'trigger_mode': ['TriggerMode', 'TriggerMode_RBV', False, { 0: 'Software', 1: 'Internal', 3: 'External' }], 'file_path': ['HDF5:FilePath', 'HDF5:FilePath_RBV', True, {}], 'file_name': ['HDF5:FileName', 'HDF5:FileName_RBV', True, {}], 'file_capture': ['HDF5:Capture', 'HDF5:Capture_RBV', False, {}], 'file_number': ['HDF5:FileNumber', 'HDF5:FileNumber', False, {}], } _sf_parameters = { 'subframe_rate': ['SUBFRAME_RATE', 'SUBFRAME_RATE', False], 'subframe_cycles': ['SUBFRAME_CYCLES', 'SUBFRAME_CYCLES', False], 'subframe_resolution': ['SUBFRAME_RESOLUTION', 'SUBFRAME_RESOLUTION', False], 'num_subframes': ['NUM_SUBFRAMES_RBV', 'NUM_SUBFRAMES_RBV', False], } def __init__(self, prefix, logger=None, subframes=False): """ Create an Xspress 3 device instance Args: prefix (string): the PV prefix of the device, eg. XSPRESS3-EXAMPLE Kwargs: subframes (bool): enable subframe parmeters (special IOC required) """ self.logger = logger or logging.getLogger(__name__) self._prefix = prefix self._channels = caget(self._pv('NUM_CHANNELS_RBV')) self.logger.info('System has {chans} channels'.format(chans=self._channels)) # Disable built in DTC caput(self._pv('CTRL_DTC'), False) # MCAs + SCAs for c in range(self._channels): cid = c+1 caput(self._pv('C{c}_PluginControlVal').format(c=cid), 1) self._mcas.append(MonitorPV(self._pv('ARR{c}:ArrayData'.format(c=cid)))) scalars = [] for s in range(self._sca_count): scalars.append(MonitorPV(self._pv('C{c}_SCA{s}:Value_RBV'.format(c=cid, s=s)))) self._scalars.append(scalars) # Acq Params for k,p in self._parameters.iteritems(): self._params[k] = MonitorPV(self._pv(p[1]), p[2]) if subframes == True: for k,p in self._sf_parameters.iteritems(): self._params[k] = MonitorPV(self._pv(p[1]), p[2]) self._nfr_acq = PV(self._pv('ArrayCounter_RBV'), self._frame_change) self._acq_status = MonitorPV(self._pv('Acquire_RBV')) self._file_name = MonitorPV(self._pv('HDF5:FullFileName_RBV'), is_string=True) # Short delay to wait for PV readbacks time.sleep(0.2)
[docs] def channels(self): """ Get the number of channels on the Xspress 3 system Returns: no_channels (int): The number of channels on the system """ return self._channels
[docs] def add_frame_callback(self, callback): """ Add a frame change callback Adds a callback to the list of callbacks called when a frame change occurs during an acquisition. The callback recieves the frame number as an argument >>> def callback(frame_number): >>> print 'New frame {f}'.format(frame_number) >>> x3.add_frame_callback(callback) Args: callback (callable): the callback to add to the list of frame callbacks """ assert not callback in self._frame_callbacks, 'Callback already registered' self._frame_callbacks.append(callback)
def _pv(self, pv): return '{prefix}:{pv}'.format(prefix=self._prefix, pv=pv) def _frame_change(self, **kwargs): self._num_acquired = kwargs['value'] self.logger.debug('Frame Changed: {frame}'.format(frame=kwargs['value'])) for c in range(self._channels): self.logger.debug(' Ch {ch} Time {t} Events {e} Reset {r}'.format(ch=c, t=self.sca(c, 0), e=self.sca(c,3), r=self.sca(c,2))) self.logger.debug(' MCA Counts {cts}'.format(cts=sum(self._mcas[c].value()))) if self.acquiring(): for c in self._frame_callbacks: c(self._num_acquired) self._acquired_iter += 1
[docs] def dropped_frames(self): """ Return the number of dropped frames in the last acquisition At high frames rates EPICS cannot keep up with PVs changing quickly, in these situations use file saving and read the hdf5 with the hdf5 module. Tests shows this occurs above around 100Hz This value will tell you how many frames were dropped Returns: dropped_frames (int): the number of dropped frames in the last acquisition """ return self._num_acquired - self._acquired_iter
[docs] def set(self, **kwargs): """ Set an attribute on the device Kwargs: exposure_time (float): per image exposure time in seconds num_images (int): number of images to acquire trigger_mode (string): trigger mode (Internal, External) file_path (string): path to save hdf5 files to file_name (string): file template for hdf5 file_number (int): file number ({file_path}/{file_name}{file_number}.hdf5) file_capture (bool): enable hdf5 saving """ for p,v in kwargs.iteritems(): assert p in self._parameters, 'No such parameter {param}'.format(param=p) if len(self._parameters[p][3]): val = None for k,va in self._parameters[p][3].iteritems(): if v == va: val = k assert val is not None, 'Invalid value {val} for parameter {param}. Available values are: {vals}'.format( val=v, param=p, vals=','.join(self._parameters[p][3].values())) caput(self._pv(self._parameters[p][0]), val) else: caput(self._pv(self._parameters[p][0]), v) time.sleep(0.2)
[docs] def get(self, param=None): """ Get the value of an attribute Args: param (string): parameter to return the value of Returns: value (mixed): the value of the parameter """ if param is None: vals = {} for p,pv in self._params.iteritems(): if len(self._parameters[p][3]): val = pv.value() vals[p] = self._parameters[p][3][val] else: vals[p] = pv.value() return vals else: assert param in self._parameters, 'No such parameter {param}'.format(param=param) return self._params[param].value()
[docs] def setup_file_saving(self, dirn, template): """ Setup hdf5 file saving >>> x3.setup_file_saving('/home/test/data', 'test') >>> '/home/test/data/test{number}.hdf5' Args: dirn (string): directory to save hdf5 files to template (string): file template of hdf5 files Returns: filename (string): the filename of the last hdf5 captured """ self.set( file_path=dirn, file_name=template, ) self.file_saving(True) return self.filename()
[docs] def file_saving(self, enable): """ Enable file saving File saving is disabled after each acquisition, renable or disable it Args: enable (bool): enable or disable file saving """ self.set(file_capture=1 if enable else 0) return self.filename()
[docs] def filename(self): """ Returns the current/last hdf5 filename Returns: filename (string): the current hdf5 filename """ return self._file_name.value()
[docs] def acquire(self): """ Starts an acquisition """ self._acquired_iter = 0 self._num_acquired = 0 caput(self._pv('Acquire'), 0) caput(self._pv('ERASE'), 1) time.sleep(0.2) caput(self._pv('Acquire'), 1) while self._acq_status.value() != 1: self.logger.info('Preparing Acquisition') time.sleep(0.5) self.logger.info('Acquiring')
[docs] def stop(self): """ Stops an acquisiton """ self.logger.info('Aborting Acquisition') caput(self._pv('Acquire'), 0)
[docs] def acquiring(self): """ Returns the acquisition status Returns: acquiring (bool): the acquisition status of the device """ return self._acq_status.value()
[docs] def num_acquired(self): """ Returns the number of frames acquired Returns: num_acquired (int): the number of frames acquired """ return self._num_acquired
[docs] def mca(self, chan): """ Returns the MCA for the specified channel This value will be automatically updated whenever the PV changes Args: chan (int): channel number to return MCA for, zero offset Returns: mca (list[int]): return a list of mca values (usually 4096 values) """ assert chan < len(self._mcas), 'Chan {chan} > number of channels {chans}'.format(chan=chan, channels=self._channels) return self._mcas[chan].value()
[docs] def sca(self, chan, sca): """ Returns the specified scalar for the specified channel Args: chan (int): channel number to return scalar for, zero offset sca (int): scalar to return Returns: scalar (int): the specified scalar value Available scalars: 0. Time in ticks 1. ResetTicks 2. ResetCount 3. AllEvent 4. AllGood 5. InWindow 0 6. InWindow 1 7. PileUp """ assert chan < len(self._scalars), 'Chan {chan} > number of channels {chans}'.format(chan=chan, channels=self._channels) assert sca < len(self._scalars[chan]), 'Scalar {sca} > number of scalars {scalars}'.format(chan=sca, channels=self._channels) return self._scalars[chan][sca].value()