Source code for contrast.detectors.Eiger

from .Detector import (
    Detector, SoftwareLiveDetector, TriggeredDetector, BurstDetector)
from ..recorders.Hdf5Recorder import Link
from ..environment import env

import time
import numpy as np
import os
import requests
import json
import zmq
from threading import Thread
from base64 import b64encode, b64decode
try:
    import tango
except ImportError:
    pass


[docs] class Eiger(Detector, SoftwareLiveDetector, TriggeredDetector, BurstDetector): """ Provides a direct interface to the Dectris Eiger server. """ def __init__(self, name=None, host='b-nanomax-eiger-dc-1.maxiv.lu.se', api_version='1.8.0', use_image_appendix=False, hdf_path='entry/instrument/Eiger/data', hw_trig_min_latency=100e-9): """ Class to interact directly with the Eiger Simplon API. """ self.host = host self.api_version = api_version self._hdf_path = hdf_path self.acqthread = None self.use_image_appendix = use_image_appendix self.hw_trig_min_latency = hw_trig_min_latency Detector.__init__(self, name=name) SoftwareLiveDetector.__init__(self) TriggeredDetector.__init__(self) BurstDetector.__init__(self) def initialize(self): self.session = requests.Session() self.session.trust_env = False self.burst_latency = 100e-9 # set up the detector self._set('detector', 'command/disarm') self._set('detector', 'command/cancel') # who knows self._set('detector', 'config/threshold/1/mode', 'enabled') self._set('detector', 'config/threshold/2/mode', 'disabled') self._set('stream', 'config/mode', 'enabled') self._set('filewriter', 'config/mode', 'disabled') self._set('monitor', 'config/mode', 'enabled') self._set('stream', 'config/header_detail', 'all') self._set('detector', 'config/counting_mode', 'retrigger') def _get(self, subsystem, key, timeout=3.0): response = self.session.get( 'http://%s/%s/api/%s/%s' % ( self.host, subsystem, self.api_version, key), timeout=timeout) if response: if 'application/json' in response.headers['content-type']: return response.json() else: print('unkown response type', response.headers['content-type']) else: print('error') print(response) def _set(self, subsystem, key, value=None, timeout=3.0): url = 'http://%s/%s/api/%s/%s' % ( self.host, subsystem, self.api_version, key) if value is None: payload = None else: payload = {'value': value} response = self.session.put(url, json=payload, timeout=timeout) if response.status_code != 200: print(response.text) def busy(self): if self.acqthread and self.acqthread.is_alive(): return True return not self._get( 'detector', 'status/state')['value'] in ('idle', 'ready') @property def max_count_rate(self): """ Maximum count rate according to the server """ val = self._get( 'detector', 'config/countrate_correction_count_cutoff')['value'] return int(val) @property def compression(self): """ Whether bitshuffle compression is enabled """ val = self._get('detector', 'config/compression')['value'] return val == 'bslz4' @compression.setter def compression(self, val): if val: self._set('detector', 'config/compression', 'bslz4') else: self._set('detector', 'config/compression', 'none') @property def energy(self): """ Operating energy in keV """ return self._get('detector', 'config/photon_energy')['value'] @energy.setter def energy(self, val): if (val < 4000) or (val > 30000): print('Bad energy value, should be in between 4000 eV and 30000 eV') return val = float(val) self._set('detector', 'config/photon_energy', val) print(f'energy of {self.name} has been set to {self.energy} eV') @property def mask_applied(self): """ Whether to apply the mask """ return self._get('detector', 'config/pixel_mask_applied')['value'] @mask_applied.setter def mask_applied(self, val): self._set('detector', 'config/pixel_mask_applied', val) @property def pixel_splitting(self): """ Whether to use virtual pixel splitting """ return self._get( 'detector', 'config/virtual_pixel_correction_applied')['value'] @pixel_splitting.setter def pixel_splitting(self, val): self._set('detector', 'config/virtual_pixel_correction_applied', val) @property def threshold(self): """ Energy threshold for the counters """ return self._get('detector', 'config/threshold/1/energy')['value'] @threshold.setter def threshold(self, val): self._set('detector', 'config/threshold/1/energy', float(val)) def prepare(self, acqtime, dataid, n_starts): BurstDetector.prepare(self, acqtime, dataid, n_starts) self._set('detector', 'config/nimages', self.burst_n) self._set('detector', 'config/frame_time', self.acqtime + self.burst_latency) self._set('detector', 'config/count_time', self.acqtime) if self.hw_trig: self._set('detector', 'config/trigger_mode', 'exts') self._set('detector', 'config/ntrigger', int(self.hw_trig_n * n_starts)) else: self._set('detector', 'config/trigger_mode', 'ints') # np.int64 isn't json serializable: self._set('detector', 'config/ntrigger', int(n_starts)) if dataid is None: self.dpath = '' else: filename = 'scan_%06d_%s.h5' % (dataid, self.name) self.dpath = os.path.join(env.paths.directory, filename) if os.path.exists(self.dpath): print('%s: this hdf5 file exists, I am raising an error now' % self.name) raise Exception('%s hdf5 file already exists' % self.name) self._set('stream', 'config/header_appendix', json.dumps({'filename': self.dpath})) if self.use_image_appendix: # for CoSAXS tango server self._set('stream', 'config/image_appendix', json.dumps({'filename': self.dpath})) self._set('detector', 'command/arm') self.n_started = 0 def arm(self): # The Eiger is armed only once. pass def start(self): self.n_started += 1 if not self.hw_trig: self.acqthread = Thread(target=self._set, args=('detector', 'command/trigger'), kwargs={'timeout': None}) self.acqthread.start() def stop(self): # there's also cancel - not sure which to use: self._set('detector', 'command/disarm') def read(self): if self.dpath: ret = {'frames': Link(self.dpath, self._hdf_path, universal=True), 'thumbs:': None} else: ret = None return ret def _start(self, acqtime): """ The Eiger needs to override this method as it uses software triggers. """ self.stopped = False NN = 1000 while not self.stopped: self.prepare(acqtime, None, NN) for nn in range(NN): if self.stopped: self.stop() break self.arm() self.start() while self.busy(): time.sleep(.05)
[docs] def get_mask(self): """ Get the Eiger mask, taken from the example in the manual. """ darray = self._get( 'detector', 'config/pixel_mask', timeout=15)['value'] dtype = np.dtype(str(darray['type'])) shape = darray['shape'] mask = np.frombuffer( b64decode(darray['data']), dtype=dtype).reshape(shape) return mask.copy()
[docs] def set_mask(self, array): """ Set the Eiger mask, also adapted from the manual. """ data = {'__darray__': (1, 0, 0), 'type': array.dtype.str, 'shape': array.shape, 'filters': ['base64'], 'data': b64encode(array.data).decode('ascii')} self._set('detector', 'config/pixel_mask', data, timeout=15) # arming and disarming stores it: self._set('detector', 'command/arm') self._set('detector', 'command/disarm')
[docs] class EigerTango(Detector, SoftwareLiveDetector, TriggeredDetector, BurstDetector): """ Provides a direct interface to the Dectris Eiger server via a Tango Server. """ def __init__(self, device_name, name=None, #api_version='1.8.0', use_image_appendix=False, hdf_path='entry/instrument/Eiger/data', rotation=0, hw_trig_min_latency=100e-9): """ Class to interact directly with the Eiger Simplon API. """ self.proxy = tango.DeviceProxy(device_name) self.proxy.set_timeout_millis(10000) self.acqthread = None self._hdf_path = hdf_path self.name = name self.rotation = rotation self.hw_trig_min_latency = hw_trig_min_latency self.host = self._get_tango_property('dcu_host') self.api_version = self._get_tango_property('api_version') Detector.__init__(self, name=name) SoftwareLiveDetector.__init__(self) TriggeredDetector.__init__(self) BurstDetector.__init__(self) def initialize(self): self.session = requests.Session() self.session.trust_env = False self.burst_latency = 100e-9 self.n_started = 0 print(f"{self.name}: Initilazing the Tango Server...", end="") self.proxy.Init() print(f" done.") def _get_tango_property(self, property: str): '''helper function to easily read properties from the Tango servers''' raw = self.proxy.get_property([property]) ret = raw[property][0] # unpack the response return ret def _get(self, subsystem, key, timeout=3.0): response = self.session.get( 'http://%s/%s/api/%s/%s' % ( self.host, subsystem, self.api_version, key), timeout=timeout) if response: if 'application/json' in response.headers['content-type']: return response.json() else: print('unkown response type', response.headers['content-type']) else: print('error') print(response) def _set(self, subsystem, key, value=None, timeout=3.0): url = 'http://%s/%s/api/%s/%s' % ( self.host, subsystem, self.api_version, key) if value is None: payload = None else: payload = {'value': value} response = self.session.put(url, json=payload, timeout=timeout) if response.status_code != 200: print(response.text) def busy(self): # if self.proxy.State() == tango.DevState.RUNNING: # return True # else: # return False # We cannot rely purly on the nFramesReceived attribute, as it is not reset at the end of an acquisition # so we check the State first, to see if it is idle if self.proxy.State() == tango.DevState.STANDBY: return False if self.proxy.nFramesReceived < self.n_started*self.burst_n: return True else: return False @property def max_count_rate(self): """ Maximum count rate according to the server """ val = self._get( 'detector', 'config/countrate_correction_count_cutoff')['value'] return int(val) @property def energy(self): """ Operating energy in eV """ return self.proxy.Energy @energy.setter def energy(self, val): if (val < 4000) or (val > 30000): print('Bad energy value, should be in between 4000 eV and 30000 eV') return val = float(val) self.proxy.Energy = val print(f'energy of {self.name} has been set to {self.energy} eV') @property def mask_applied(self): """ Whether to apply the mask """ return self._get('detector', 'config/pixel_mask_applied')['value'] @mask_applied.setter def mask_applied(self, val): self._set('detector', 'config/pixel_mask_applied', val) @property def pixel_splitting(self): """ Whether to use virtual pixel splitting """ return self._get( 'detector', 'config/virtual_pixel_correction_applied')['value'] @pixel_splitting.setter def pixel_splitting(self, val): self._set('detector', 'config/virtual_pixel_correction_applied', val) @property def threshold(self): """ Energy threshold for the counters """ return self.proxy.Threshold @threshold.setter def threshold(self, val): self.proxy.Threshold = val @property def rotation(self): """Rotation of detector frame. Uses numpy.rot90() notation.""" print(f"{self.name}: Frame rotation (in numpy.rot90() notation): {self.proxy.Rotation}") return self.proxy.Rotation @rotation.setter def rotation(self, val): self.proxy.Rotation = int(val) print(f"{self.name}: Frame rotation (in numpy.rot90() notation): {self.proxy.Rotation}") def prepare(self, acqtime, dataid, n_starts): BurstDetector.prepare(self, acqtime, dataid, n_starts) acqtime = self.acqtime if self.busy(): raise Exception(f'{self.name} is busy!') self.proxy.nFramesPerTrigger = self.burst_n self.proxy.FrameTime = acqtime + self.burst_latency self.proxy.ExposureTime = acqtime self.repetitions = self.hw_trig_n if self.hw_trig else 1 if (dataid is None) or (env.paths.directory is None): self.dpath = '' else: path = env.paths.directory filename = 'scan_%06d_%s.h5' % (dataid, self.name) self.dpath = os.path.join(env.paths.directory, filename) if os.path.exists(self.dpath): print('%s: this hdf5 file exists, I am raising an error now' % self.name) raise Exception('%s hdf5 file already exists' % self.name) self.proxy.DestinationFilename = self.dpath if self.hw_trig: self.proxy.TriggerMode = 'EXTERNAL_MULTI' self.proxy.nTriggers = self.hw_trig_n * n_starts else: self.proxy.TriggerMode = 'INTERNAL' self.proxy.nTriggers = n_starts self.proxy.nFramesPerTrigger = self.burst_n self.proxy.Arm() self.n_started = 0 def arm(self): # The Eiger is armed only once. pass def start(self): self.n_started += self.repetitions if not self.hw_trig: self.proxy.Trigger() def stop(self): self.proxy.Stop() self.n_started = 0 def read(self): if self.dpath: ret = {'frames': Link(self.dpath, self._hdf_path, universal=True), 'thumbs:': None} else: ret = None return ret def start_live(self, acqtime): # for now, the Tango DS does not support variable exposure times for the live mode. Live mode is hardcoded to 1 s. self.proxy.Live() def stop_live(self): self.stop()
[docs] def get_mask(self): """ Get the Eiger mask, taken from the example in the manual. """ darray = self._get( 'detector', 'config/pixel_mask', timeout=15)['value'] dtype = np.dtype(str(darray['type'])) shape = darray['shape'] mask = np.frombuffer( b64decode(darray['data']), dtype=dtype).reshape(shape) return mask.copy()
[docs] def set_mask(self, array): """ Set the Eiger mask, also adapted from the manual. """ data = {'__darray__': (1, 0, 0), 'type': array.dtype.str, 'shape': array.shape, 'filters': ['base64'], 'data': b64encode(array.data).decode('ascii')} self._set('detector', 'config/pixel_mask', data, timeout=15) # arming and disarming stores it: self._set('detector', 'command/arm') self._set('detector', 'command/disarm')
[docs] class SelunTango(EigerTango): """ Provides a direct interface to the Dectris SELUN server via a Tango Server. """ @property def OnDetectorBinning(self): """ On Detector binning. Options: '1x1' - 190x190 pixels at max. 30 kHz '2x2' - 94x94 pixels at up to 120 kHz """ return self.proxy.OnDetectorBinning @OnDetectorBinning.setter def OnDetectorBinning(self, val): """ Setter for the OnDetectorBinning attribute """ if val in ['1x1', '2x2']: self.proxy.OnDetectorBinning = val else: print(f'[!] invalid option for OnDetectorBinning. Did not change the setting.')