Source code for contrast.detectors.Xspress3
from .Detector import (
Detector, SoftwareLiveDetector, TriggeredDetector, BurstDetector)
from ..environment import env
from ..recorders.Hdf5Recorder import Link
import os
try:
import PyTango
except ModuleNotFoundError:
pass
import time
[docs]class Xspress3(Detector, SoftwareLiveDetector, TriggeredDetector,
BurstDetector):
"""
Provides an interface to the Xspress3 streaming manager,
https://github.com/maxiv-science/xspress3-streamer
"""
def __init__(self, device='staff/alebjo/xspress3', name=None):
SoftwareLiveDetector.__init__(self)
TriggeredDetector.__init__(self)
BurstDetector.__init__(self)
# do this last so that initialize() can overwrite parent defaults:
Detector.__init__(self, name=name)
self.proxy = PyTango.DeviceProxy(device)
def initialize(self):
self.burst_latency = 100e-9
def prepare(self, acqtime, dataid, n_starts):
if self.busy():
raise Exception('%s is busy!' % self.name)
# saving and paths
if dataid is None:
# no saving
self.saving_file = ''
self.proxy.DestinationFileName = self.saving_file
else:
# saving
path = env.paths.directory
fn = 'scan_%06d_%s.hdf5' % (dataid, self.name)
self.saving_file = os.path.join(path, fn)
if os.path.exists(self.saving_file):
print('%s: this hdf5 file exists, I am raising an error now'
% self.name)
raise Exception('%s hdf5 file already exists' % self.name)
self.proxy.DestinationFileName = self.saving_file
# arming and numbers of frames
self.proxy.ExposureTime = acqtime
self.proxy.nFramesPerTrigger = self.burst_n
self.proxy.LatencyTime = self.burst_latency
if self.hw_trig:
self.proxy.TriggerMode = 'EXTERNAL_MULTI'
else:
self.proxy.TriggerMode = 'SOFTWARE'
# if not in burst mode, we can arm here and then just soft trigger
ntrig = 1
self.arm_calls = 0
if self.burst_n == 1:
ntrig *= n_starts
if self.hw_trig:
ntrig *= self.hw_trig_n
self.proxy.nTriggers = ntrig
if self.burst_n == 1:
self.proxy.Arm()
# so how many frames do we expect to see before the detector is
# done with a specific start()?
exp = self.burst_n
if self.hw_trig:
exp *= self.hw_trig_n
self.expected_per_arm = exp
self.expected_total = 0
def arm(self):
# in burst mode, we have to arm here, otherwise it's already done
if self.burst_n > 1:
self.proxy.Arm()
self.expected_total = self.expected_per_arm
else:
self.expected_total += self.expected_per_arm
def start(self):
if self.hw_trig:
return
self.proxy.SoftwareTrigger()
def stop(self):
self.proxy.Stop()
def busy(self):
while True:
try:
st = self.proxy.State()
if st == PyTango.DevState.STANDBY:
return False
elif st == PyTango.DevState.RUNNING:
if self.proxy.nFramesAcquired == self.expected_total:
return False
return True
except PyTango.DevFailed:
print('%s.busy(): failed to talk to my Tango device, '
'trying again...' % self.name)
time.sleep(.5)
def read(self):
if self.saving_file == '':
return None
else:
return Link(self.saving_file, '/entry/instrument/xspress3/', universal=True)