Source code for contrast.scans.Scan

import time
import datetime
import numpy as np
import requests

from ..environment import macro, env
from ..recorders import active_recorders, RecorderHeader, RecorderFooter
from ..detectors import Detector, TriggeredDetector, TriggerSource
from contrast.detectors.PandaBox import PandaBox
from ..utils import SpecTable
from .. import colors
from collections import OrderedDict
import sys


[docs] class SoftwareScan(object): """ Base class for the normal sardana-style software-controlled scan. Respects the availability and deadlines managed by env.scheduler, honours env.shapshot, and acts on all active detectors, trigger sources, and recorders. """ dict_print_length = 5 str_print_length = 12 def __init__(self, exposuretime): """ The constructor should parse the parameters of the derived macro. :param exposuretime: Exposure time to pass on to detectors etc. :type exposuretime: float """ self._command = None # updated if run via macro self.motors = [] # list of motors, to be filled by subclass self.exposuretime = exposuretime self.scannr = env.nextScanID self.n_positions = None self.print_progress = True env.nextScanID += 1 self.flyscan = False def output(self, i, dct): # ignore dicts that are too long for k, v in dct.items(): if type(v) == dict and len(v) > self.dict_print_length: dct[k] = {'...': '...'} dct[' #'] = i dct.move_to_end(' #', last=False) if i == 0: self.table = SpecTable() self.table.max_str_len = self.str_print_length header = self.table.header_lines(dct) print('\n' + header) print('-' * len(header.split('\n')[-1])) print(self.table.fill_line(dct)) if self.n_positions and self.print_progress: timeleft = str(datetime.timedelta( seconds=( self.n_positions - i) * dct['dt'] / (i + 1))).split('.')[0] print('Time left: %s\r' % timeleft, end='')
[docs] def _calc_time_needed(self): """ Estimates the time needed for performing the next acquisition. This can be done based on the input parameters, or on the timing of previous points. """ return self.exposuretime * 5 + 5
[docs] def _before_scan(self): """ Placeholder method for users to hook actions onto scan classes after import. For example opening a shutter:: def pre_scan_stuff(slf): print("Maybe open a shutter here?") SoftwareScan._before_scan = pre_scan_stuff """ pass
[docs] def _after_scan(self): """ Placeholder method for users to hook actions onto scan classes after import, see ``_before_scan``. """ pass
[docs] def _before_move(self): """ Gets called for each step, and can be used for example to check that the instrument is ready for the next acquisition, that there is beam in the machine, etc. """ # Example implementation of topup avoidance time_needed = self._calc_time_needed() limit = env.scheduler.limit enough_time = time_needed < limit if limit else True ready = env.scheduler.ready found_not_ready = False try: while not ready or not enough_time: if not enough_time: print('not enough time to complete this measurement so' + ' waiting, press ctrl-c to ignore from now on...') else: if not found_not_ready: print('Waiting for beamline to become available,' + ' press ctrl-c to ignore from now on...') found_not_ready = True else: print('.', end='') sys.stdout.flush() time.sleep(1.) limit = env.scheduler.limit enough_time = time_needed < limit if limit else True ready = env.scheduler.ready except KeyboardInterrupt: env.scheduler.disabled = True
[docs] def _before_arm(self): """ Gets called for each step. See ``_before_move``. """ pass
[docs] def _while_acquiring(self): """ Gets called repeatedly while the detectors detect. Useful for printing a progress bar, for example. """ pass
[docs] def _before_start(self): """ Gets called for each step. See ``_before_move``. """ pass
[docs] def arm_dranspose_pipeline(self): """ arming dranspose pipeline this sends a message prior to the scan with how many triggers there will be in the scan ... this fails in scans with more than 100k triggers """ det_group = Detector.get_active() soft_positions = sum(1 for _ in self._generate_positions()) fly_positions = 1 if hasattr(self, 'fastmotorintervals'): fly_positions = (self.fastmotorintervals + 1) all_positions = soft_positions * fly_positions print('\nNumber of Positions are', all_positions) streams = [d.name for d in det_group if d.active is True] streams = [d for d in streams if d in ['xspress3', 'panda0']] if all_positions <= 1.e5: triggermap = {stream:[[{"constraint": i}] for i in range(all_positions)] for stream in streams} triggermap['contrast'] = [[{"constraint": i}] if i%fly_positions == (fly_positions-1) else None for i in range(all_positions)] blub = requests.post("http://nanomax-pipeline-controller.daq.maxiv.lu.se/api/v1/mapping", json=triggermap) print('dramspose UUID:', blub.content)
[docs] def run(self): """ This is the main acquisition loop where interaction with motors, detectors and other ``Gadget`` objects happens. """ self._before_scan() print(f'\nScan {colors.str_scannumber(self.scannr)} starting at {time.asctime()}') positions = self._generate_positions() # find and prepare the detectors det_group = Detector.get_active() #self.arm_dranspose_pipeline() #commented out until fixed trg_group = TriggerSource.get_active() group = det_group + trg_group if group.busy(): print('These gadgets are busy: %s' % (', '.join([d.name for d in group if d.busy()]))) return if not self.flyscan: for d in group: if isinstance(d, TriggeredDetector) and not isinstance(d, PandaBox): d.hw_trig_n = 1 group.prepare(self.exposuretime, self.scannr, self.n_positions, trials=10) t0 = time.time() # take a pre scan snapshot if env.snapshot.pre_scan: snap = env.snapshot.capture() else: snap = {} # send a header to the recorders for r in active_recorders(): r.queue.put(RecorderHeader(scannr=self.scannr, status='started', path=env.paths.directory, snapshot=snap, description=self._command, contrast_version={'git_hash':env.contrast_git_hash, 'uncommitted_changes': {f'file_{i}': x for i, x in enumerate(env.uncommitted_changes)}})) try: for i, pos in enumerate(positions): # move motors self._before_move() for m in self.motors: m.move(pos[m.name]) while True in [m.busy() for m in self.motors]: time.sleep(.05) # arm detectors self._before_arm() group.arm() # start detectors self._before_start() group.start(trials=10) while det_group.busy(): self._while_acquiring() time.sleep(.025) # read detectors and motors dt = time.time() - t0 dct = OrderedDict() for m in self.motors: dct[m.name] = m.position() for d in det_group: dct[d.name] = d.read() dct['dt'] = dt # pass data to recorders for r in active_recorders(): r.queue.put(dct) # print spec-style info self.output(i, dct.copy()) print(f'\nScan {colors.str_scannumber(self.scannr)} ending at {time.asctime()}') # take a post scan snapshot if env.snapshot.post_scan: snap = env.snapshot.capture() else: snap = {} # tell the recorders that the scan is over for r in active_recorders(): r.queue.put(RecorderFooter(scannr=self.scannr, status='finished', path=env.paths.directory, snapshot=snap, description=self._command)) except KeyboardInterrupt: group.stop() print(f'\nScan {colors.str_scannumber(self.scannr)} cancelled at {time.asctime()}') # take a post scan snapshot if env.snapshot.post_scan: snap = env.snapshot.capture() else: snap = {} # tell the recorders that the scan was interrupted for r in active_recorders(): r.queue.put(RecorderFooter(scannr=self.scannr, status='interrupted', path=env.paths.directory, snapshot=snap, description=self._command)) except: self._after_scan() raise # do any user-defined cleanup actions self._after_scan()
[docs] def _generate_positions(self): """ *Override this method.* Function or generator which returns or yields an iterable of dicts, :: {motorA.name: posA, motorB.name: posB, ...} """ raise NotImplementedError
[docs] @macro class LoopScan(SoftwareScan): """ A software scan with no motor movements. Number of exposures is <intervals> + 1, for consistency with ascan, dscan etc. :: loopscan <intervals> <exp_time> """ def __init__(self, intervals, exposuretime=1.0): super(LoopScan, self).__init__(float(exposuretime)) self.intervals = intervals self.n_positions = intervals + 1 self.motors = [] def _generate_positions(self): # dummy positions with a non existent motor for i in range(self.intervals + 1): yield {'fake': i}
[docs] @macro class Ct(object): """ Make a single acquisition on the active detectors without recording data. Optional argument <exp_time> specifies exposure time, the default is 1.0. :: ct [<exp_time>] """ def __init__(self, exp_time=1, print_nd=True): self.exposuretime = float(exp_time) self.flyscan = False
[docs] def _before_ct(self): """ Placeholder method for users to hook actions onto scan classes after import. """ pass
[docs] def _after_ct(self): """ Placeholder method for users to hook actions onto scan classes after import. """ pass
def run(self): self._before_ct() # find and prepare the detectors det_group = Detector.get_active() group = det_group + TriggerSource.get_active() if not self.flyscan: for d in group: if isinstance(d, TriggeredDetector) and not isinstance(d, PandaBox): d.hw_trig_n = 1 group.prepare(self.exposuretime, dataid=None, n_starts=1) # arm and start detectors group.arm() group.start() try: while group.busy(): time.sleep(.01) except KeyboardInterrupt: group.stop() # read detectors and motors dct = {} for d in det_group: dct[d.name] = d.read() # print results for key, val in dct.items(): print(key, ':', val) self._after_ct()