Source code for contrast.detectors.Lima

from .Detector import (
    Detector, SoftwareLiveDetector, TriggeredDetector, BurstDetector)
from ..environment import env
from ..environment import macro

import time
import numpy as np
try:
    import PyTango
except ModuleNotFoundError:
    pass
import os
from h5py import VirtualSource, VirtualLayout


[docs]class LimaDetector(Detector, SoftwareLiveDetector, TriggeredDetector, BurstDetector): """ Lima base class. """ EXT_TRG_MODE = "EXTERNAL_TRIGGER_MULTI" def __init__(self, name=None, lima_device=None, det_device=None): """ Lima is sometimes very slow to finish writing data, which is why this gadget has a 'hybrid mode', where Lima is started only once, and sequential arm/start calls to this gadget only increment counters. The busy state is set based on Lima's last_image_acquired compared to these counters, and does not reflect the state of the Lima device. """ self.lima_device_name = lima_device self.det_device_name = det_device self._hybrid_mode = False self.n_starts = -1 Detector.__init__(self, name=name) SoftwareLiveDetector.__init__(self) TriggeredDetector.__init__(self) BurstDetector.__init__(self) def _safe_write(self, attr, val): ok = False while not ok: try: self.lima.write_attribute(attr, val) ok = True except PyTango.DevFailed as e: timeout = False for e_ in e.args: if 'timeout' in e_.desc.lower(): print(('*** Timeout when writing attribute %s on %s.' + 'Trying again...') % (attr, self.lima_device_name)) timeout = True if not timeout: raise def _safe_read(self, attr): ok = False while not ok: try: result = self.lima.read_attribute(attr).value ok = True except PyTango.DevFailed as e: timeout = False for e_ in e.args: if 'timeout' in e_.desc.lower(): print(('*** Timeout when reading attribute %s on %s.' + 'Trying again...') % (attr, self.lima_device_name)) timeout = True if not timeout: raise return result def _safe_call_command(self, cmd, params=None): """ Noticed that lima calls tend to time out for no apparent reason, this works around it by trying again after timeouts but raising all other errors. """ ok = False while not ok: try: self.lima.command_inout(cmd, params) ok = True except PyTango.DevFailed as e: timeout = False doublecall = False for e_ in e.args: if 'timeout' in e_.desc.lower(): print( '*** Timeout during %s call to %s. Trying again...' % (cmd, self.lima_device_name)) timeout = True if 'run prepareacq before starting' in e_.desc.lower(): print('*** StartAcq called twice on %s. Moving on...' % self.name) doublecall = True if not timeout: ok = True if not timeout or doublecall: raise def _initialize_det(self): """ Initialize detector-specific properties such as cutoff energies and bias voltages. Can also be used for overriding Lima settings that were initialized by the base class. """ pass @property def hybrid_mode(self): return self._hybrid_mode @hybrid_mode.setter def hybrid_mode(self, val): self._hybrid_mode = bool(val) self.arm_number = -1 @property def energy(self): return -1 @energy.setter def energy(self, val): pass def start_live(self, acqtime=1.0): self.hybrid_mode = False super(LimaDetector, self).start_live(acqtime) def initialize(self): self.lima = PyTango.DeviceProxy(self.lima_device_name) self.lima.set_timeout_millis(3000) self.det = PyTango.DeviceProxy(self.det_device_name) # Make sure the devices are reachable, or this will throw an error self.lima.state() self.det.state() self.lima.acq_trigger_mode = "INTERNAL_TRIGGER" self.lima.saving_mode = "AUTO_FRAME" self.lima.saving_frame_per_file = 1 self.lima.acq_nb_frames = 1 self.lima.saving_managed_mode = 'SOFTWARE' self.lima.saving_overwrite_policy = 'MULTISET' self.lima.saving_format = 'HDF5' self.lima.saving_suffix = '.hdf5' self.lima.saving_index_format = '' self.lima.latency_time = 0.0 self._initialize_det() self.hybrid_mode = False self.arm_number = -1 def make_virtual_layout(self): _, bytes, n, m = self.lima.image_sizes dtype = { 1: np.uint8, 2: np.uint16, 4: np.uint32, 8: np.uint64}[int(bytes)] # we need to have relative paths in every virtual source object relfile = self.saving_filename if self.hw_trig: N = self.n_starts * self.burst_n * self.hw_trig_n layout = VirtualLayout(shape=(N, m, n), dtype=dtype) for i in range(self.n_starts): vsource = VirtualSource( relfile, self._hdf_path_base % i, shape=(self.hw_trig_n, m, n)) layout[i * self.hw_trig_n:(i + 1) * self.hw_trig_n] = vsource elif self.hybrid_mode: N = self.n_starts * self.burst_n layout = VirtualLayout(shape=(N, m, n), dtype=dtype) vsource = VirtualSource( relfile, self._hdf_path_base % 0, shape=(N, m, n)) layout[:] = vsource else: N = self.n_starts * self.burst_n layout = VirtualLayout(shape=(N, m, n), dtype=dtype) for i in range(self.n_starts): layout[i * self.burst_n:(i + 1) * self.burst_n] = ( VirtualSource( relfile, self._hdf_path_base % i, shape=(self.burst_n, m, n))) return layout def prepare(self, acqtime, dataid, n_starts): BurstDetector.prepare(self, acqtime, dataid, n_starts) # get out of the fault caused by trigger timeout if ((self._safe_read('acq_status') == 'Fault') and (self._safe_read('acq_status_fault_error') == 'No error')): self._safe_call_command('stopAcq') self._safe_call_command('prepareAcq') self.arm_number = -1 self.n_starts = n_starts if self.busy(): raise Exception('%s is busy!' % self.name) if dataid is None: # no saving self._safe_write('saving_mode', "MANUAL") # no saving self.saving_filename = None else: # saving self._safe_write('saving_directory', env.paths.directory) self._safe_write('saving_mode', "AUTO_FRAME") prefix = 'scan_%06d_%s' % (dataid, self.name) self._safe_write('saving_prefix', prefix) self.saving_filename = prefix + self._safe_read('saving_suffix') if os.path.exists( os.path.join(env.paths.directory, self.saving_filename)): print('%s hdf5 file already exists' % self.name) raise Exception('%s hdf5 file already exists' % self.name) if self.hw_trig: self._safe_write('acq_trigger_mode', self.EXT_TRG_MODE) self._safe_write('acq_nb_frames', self.hw_trig_n) else: self._safe_write('acq_trigger_mode', "INTERNAL_TRIGGER") self._safe_write('acq_nb_frames', self.burst_n) if self.hybrid_mode: if self.burst_n != 1: print('burst_n > 1 and hybrid mode makes no sense') raise Exception('burst_n > 1 and hybrid mode makes no sense') self._safe_write('acq_trigger_mode', self.EXT_TRG_MODE) self._safe_write('acq_nb_frames', n_starts) self._safe_write('acq_expo_time', self.acqtime) if self.hybrid_mode: time.sleep(.1) self._safe_call_command('prepareAcq') self._safe_call_command('startAcq') while self.busy(): print(self.name, 'slept 5 ms while waiting for prepare') time.sleep(.005) # make a virtual layout to return instead of individual links # to individual datasets if self.saving_filename is not None: self.layout = self.make_virtual_layout() def arm(self): self.arm_number += 1 if self.hybrid_mode: return if self.busy(): raise Exception('%s is busy!' % self.name) self._safe_call_command('prepareAcq') while not self._safe_read('ready_for_next_acq'): time.sleep(.005) if self.hw_trig: self._safe_call_command('startAcq') def start(self): if self.hw_trig or self.hybrid_mode: return if self.busy(): raise Exception('%s is busy!' % self.name) if self.hybrid_mode and self._safe_read('last_image_acquired') > -1: return else: self._safe_call_command('startAcq') def stop(self): self._safe_call_command('stopAcq') def busy(self): if self.hybrid_mode: if self.arm_number + 1 < self.n_starts: return self.arm_number > self._safe_read('last_image_acquired') elif not self._safe_read('ready_for_next_acq'): print('%s waiting for Lima...' % self.name) time.sleep(.5) return True else: return not self._safe_read('ready_for_next_acq') def read(self): if self.saving_filename is None: return None else: return {'frames': self.layout}
[docs]class LimaPilatus(LimaDetector): """ Pilatus specific LimaDetector. """ def __init__(self, *args, **kwargs): super(LimaPilatus, self).__init__(*args, **kwargs) self._hdf_path_base = 'entry_%04d/measurement/Pilatus/data' def _initialize_det(self): self.energy = 10.0 self.burst_latency = 0.003 @property def energy(self): return self.det.energy_threshold @energy.setter def energy(self, value): if value < 4.5 or value > 36: raise ValueError( 'Requested value is outside the Pilatus range of 4.5-36 keV') self.det.write_attribute('energy_threshold', value)
[docs]class LimaMerlin(LimaDetector): """ Merlin specific LimaDetector. """ def __init__(self, *args, **kwargs): super(LimaMerlin, self).__init__(*args, **kwargs) self._hdf_path_base = 'entry_%04d/measurement/Merlin/data' def _initialize_det(self): self.energy = 10.0 self.det.write_attribute('gain', 'HGM') self.det.write_attribute('depth', 'BPP24') self.burst_latency = 0.002 @property def energy(self): return self.det.operatingEnergy @energy.setter def energy(self, value): self.det.write_attribute('operatingEnergy', value) def prepare(self, *args, **kwargs): if self.hw_trig: self.det.write_attribute('triggerStartType', "RISING_EDGE_TTL") else: self.det.write_attribute('triggerStartType', "INTERNAL") BurstDetector.prepare(self, acqtime, dataid, n_starts)
[docs]class LimaAndor(LimaDetector): """ Andor specific LimaDetector. """ def __init__(self, *args, **kwargs): super(LimaAndor, self).__init__(*args, **kwargs) self._hdf_path_base = 'entry_%04d/measurement/Andor/data/array' def _initialize_det(self): self.lima.user_detector_name = 'Andor'
[docs]class LimaXspress3(LimaDetector): """ Xspress3 specific LimaDetector. """ EXT_TRG_MODE = "EXTERNAL_GATE" def __init__(self, *args, **kwargs): super(LimaXspress3, self).__init__(*args, **kwargs) self._hdf_path_base = 'entry_%04d/measurement/xspress3/data'
[docs]@macro class Lima_hybrid_on(object): """ Turn on 'hybrid triggering' for the specified Lima device, or for all Lima devices if nothing is specified. """ VAL = True def __init__(self, *args): if args: self.dets = args else: self.dets = Detector.getinstances() def run(self): for d in self.dets: d.hybrid_mode = self.VAL
[docs]@macro class Lima_hybrid_off(Lima_hybrid_on): """ Turn on 'hybrid triggering' for the specified Lima device, or for all Lima devices if nothing is specified. """ VAL = False