Source code for contrast.motors.Motor

import time
import numpy as np
import os
import ast

from ..Gadget import Gadget
from ..environment import macro, env
from .. import utils

# a central list of defined bookmarks, to avoid garbage collection
bookmark_refs = []


[docs]class Motor(Gadget): """ General base class for motors. Motors have dial and user positions, which are related by an offset and a scaling factor. The user position is supposed to be used in all macros and movements. The dial position is hard coded in the Motor subclass to closely reflect what the underlying device quotes. :: user = dial * scaling + offset, dial = (user - offset) / scaling The user position can be changed, which updates the offset but not the scaling. In the same way, limits on the user position are internally converted to dial limits, such that setting the user position leaves the dial limits unchanged. """ def __init__(self, scaling=1.0, offset=0.0, dial_limits=(None, None), user_format='%.2f', dial_format='%.2f', **kwargs): """ :param scaling: Scaling factor from dial to user position :param offset: Offset from scaled dial to user position :param dial_limits: Motor limits in dial positions :param user_format: Format string for presenting user positions :param dial_format: Format string for presenting dial positions :param ``**kwargs``: Passed on to base class constructor """ super(Motor, self).__init__(**kwargs) self._lowlim, self._uplim = dial_limits self._scaling = scaling self._offset = offset self._uformat = user_format self._dformat = dial_format @property def user_position(self): return self.dial_position * self._scaling + self._offset @user_position.setter def user_position(self, pos): self._offset = pos - self.dial_position * self._scaling @property def user_limits(self): _lowlim, _uplim = self.dial_limits if None not in (_uplim, _lowlim): l1 = _lowlim * self._scaling + self._offset l2 = _uplim * self._scaling + self._offset return tuple(sorted([l1, l2])) else: return (None, None) @user_limits.setter def user_limits(self, lims): l1 = (lims[0] - self._offset) / self._scaling l2 = (lims[1] - self._offset) / self._scaling self.dial_limits = sorted([l1, l2]) @property def dial_limits(self): return (self._lowlim, self._uplim) @dial_limits.setter def dial_limits(self, lims): self._lowlim, self._uplim = lims def position(self): return self.user_position def move(self, pos): if self.busy(): raise Exception('Motor is busy') dial = (pos - self._offset) / self._scaling try: _lowlim, _uplim = self.dial_limits assert dial <= _uplim assert dial >= _lowlim except AssertionError: print('Trying to move %s outside its limits!' % self.name) return -1 except TypeError: pass self.dial_position = dial @property def dial_position(self): """ Override this property, which sets or gets the dial position. :rtype: float """ raise NotImplementedError @dial_position.setter def dial_position(self, pos): raise NotImplementedError
[docs] def busy(self): """ Overrides this method, which reports on whether or not the motor is busy. :rtype: bool """ raise NotImplementedError
[docs] def stop(self): """ Override this method, which stops the motor. """ raise NotImplementedError
[docs]class DummyMotor(Motor): """ Dummy motor which can be harmlessly moved with a velocity of 1 / s. """ def __init__(self, velocity=None, dial_position=None, *args, **kwargs): super(DummyMotor, self).__init__(*args, **kwargs) if dial_position: self._aim = dial_position self._oldpos = dial_position else: self._aim = 0.0 self._oldpos = 0.0 if velocity is None: self.velocity = 1. else: self.velocity = velocity self.moving_velocity = self.velocity self._started = 0.0 @property def dial_position(self): dpos = self._aim - self._oldpos dt = time.time() - self._started T = abs(dpos / self.moving_velocity) if dt < T: return self._oldpos + dpos * dt / T else: return self._aim @dial_position.setter def dial_position(self, pos): self._oldpos = self.dial_position self._started = time.time() self._aim = pos self.moving_velocity = self.velocity def busy(self): return not np.isclose(self._aim, self.dial_position) def stop(self): self._aim = self.dial_position
[docs]class MotorBookmark(object): """ A bookmark is a set of motor dial positions which together have some significance, for example a sample position or an attenuetor combination. """ def __init__(self, name, motors, positions=None): """ :param name: Name given to the MotorBookmark :param motors: The motor instances to bookmark. :type motors: list, tuple :param positions: List of positions to bookmark. :type positions: list, tuple """ self.name = name if positions is None: positions = [m.dial_position for m in motors] self.dct = {m: p for m, p in zip(motors, positions)}
[docs]class MotorMemorizer(Gadget): """ Saves or loads motor scaling, offsets, and limits, as well as any defined bookmarks to or from file. """ def __init__(self, filepath=None, **kwargs): """ :param filepath: Path to file where motor information will be dumped :type filepath: str :param ``**kwargs``: Passed on to the base class """ super(MotorMemorizer, self).__init__(**kwargs) self.filepath = filepath if filepath and os.path.exists(filepath): self.load()
[docs] def load(self): """ Loads memorized motor configurations from ``self.filepath``, and applies to matching motors. """ try: motors = {m.name: m for m in Motor.getinstances()} with open(self.filepath, 'r') as fp: for row in fp: dct = ast.literal_eval(row) if dct['name'] in motors: # this is an existing motor! motors[dct['name']]._offset = dct['_offset'] if dct['dial_limits']: motors[dct['name']].dial_limits = ( dct['dial_limits']) elif '_offset' not in dct.keys(): # this is a bookmark! motor_names = list(dct.keys()) motor_names.remove('name') avail_motors = list(Motor.getinstances()) motor_objs = [] bail = False for m in motor_names: match = [m_ for m_ in avail_motors if m_.name == m] if not len(match): print('Could not find motor %s, ' 'ignoring bookmark %s' % (m, dct['name'])) bail = True break motor_objs.append(match[0]) if bail: break bookmark_refs.append( MotorBookmark(name=dct['name'], motors=motor_objs, positions=[ dct[n] for n in motor_names])) print('Loaded motor states and bookmarks from %s' % self.filepath) except (FileNotFoundError,): print("Memorizer file %s doesn't exist" % self.filepath)
[docs] def dump(self): """ Dumps motor configurations for all motors to ``self.filepath``. """ try: with open(self.filepath, 'w') as fp: for m in Motor.getinstances(): dct = {'name': m.name, '_offset': m._offset, 'dial_limits': m.dial_limits} fp.write(str(dct) + '\n') for b in bookmark_refs: dct = {m.name: p for m, p in b.dct.items()} dct['name'] = b.name fp.write(str(dct) + '\n') print('Saved motor states and bookmarks to %s' % self.filepath) except (FileNotFoundError,): print("Cant write to %s, doesn't exist")
def expect_motors(motors): for m in motors: if not isinstance(m, Motor): raise Exception('%s is not a Motor object!' % m)
[docs]@macro class Mv(object): """ Move one or more motors. :: mvr <motor1> <position1> <motor2> <position2> ... """ def __init__(self, *args): self.motors = args[::2] self.targets = np.array(args[1::2]) expect_motors(self.motors) def _run_while_waiting(self): pass def run(self): if max(m.userlevel for m in self.motors) > env.userLevel: print('You are trying to move motors above your user level!') return for m, pos in zip(self.motors, self.targets): m.move(pos) try: while True in [m.busy() for m in self.motors]: self._run_while_waiting() time.sleep(.01) except KeyboardInterrupt: for m in self.motors: m.stop()
[docs]@macro class Mvd(object): """ Move one or more motors to an abolute dial position. Not implemented. """ def run(self): raise NotImplementedError
[docs]@macro class Mvr(Mv): """ Move one or more motors relative to their current positions. :: mvr <motor1> <position1> <motor2> <position2> ... """ def __init__(self, *args): self.motors = args[::2] displacements = np.array(args[1::2]) current = np.array([m.position() for m in self.motors]) self.targets = current + displacements
[docs]@macro class Umv(Mv): """ Like mv, but prints the current position while moving, and returns when the move is complete. """ def _run_while_waiting(self): ll = ['%s: %s' % (m.name, m._uformat % m.user_position) for m in self.motors] s = '; '.join(ll) print(s + '\r', end='') def run(self): # ensures the final position is printed too super(Umv, self).run() self._run_while_waiting()
[docs]@macro class Umvr(Mvr, Umv): """ Like umv, but in positions relative to the current ones. """ pass # less is more
[docs]@macro class Wm(object): """ Print the positions of one or more motors. :: wm <motor1> <motor2> ... """ def __init__(self, *args): self.motors = args self.out = True expect_motors(self.motors) def run(self, *args): ret = None titles = ['motor', 'user pos.', 'limits', 'dial pos.', 'limits'] table = [] for m in self.motors: try: upos = m.user_position ret = upos upos = m._uformat % upos dpos = m._dformat % m.dial_position if None in m.dial_limits: ulims = '(None, None)' dlims = '(None, None)' else: ulims = ('(%s, %s)' % (2 * (m._uformat,))) % m.user_limits dlims = ('(%s, %s)' % (2 * (m._dformat,))) % m.dial_limits table.append([m.name, upos, ulims, dpos, dlims]) except: print('Could not get position of %s' % m.name) ret = None if self.out: print(utils.list_to_table(lst=table, titles=titles, margins=[5, 2, 5, 2, 0])) return ret
[docs]@macro class WmS(Wm): """ Silent 'where motor'. Print the positions of one or more motors but do not print any output. :: wms <motor1> <motor2> ... """ def __init__(self, *args): self.motors = args self.out = False
[docs]@macro class Wa(Wm): """ Print the positions of all motors available at the current user level. """ def __init__(self): self.motors = [m for m in Motor.getinstances() if m.userlevel <= env.userLevel] self.out = True
[docs]@macro class LsM(object): """ List available motors. """ def run(self): dct = {m.name: m.__class__ for m in Motor.getinstances() if m.userlevel <= env.userLevel} print(utils.dict_to_table(dct, titles=('name', 'class'), sort=True))
[docs]@macro class SetLim(object): """ Set limits on motors. :: setlim <motor1> <lower 1> <upper 1> ... Also saves new limits to all available ``MotorMemorizer`` objects. """ def __init__(self, *args): self.motors = args[::3] self.lowers = args[1::3] self.uppers = args[2::3] expect_motors(self.motors) def run(self): for m, l, u in zip(self.motors, self.lowers, self.uppers): m.user_limits = (l, u) # memorize the new state for m in MotorMemorizer.getinstances(): m.dump()
[docs]@macro class SetPos(object): """ Sets user position on motors. :: setpos <motor1> <pos1> ... Also saves new user positions to all available ``MotorMemorizer`` objects. """ def __init__(self, *args): self.motors = args[::2] self.positions = args[1::2] expect_motors(self.motors) def run(self): for m, p in zip(self.motors, self.positions): m.user_position = p # memorize the new state for m in MotorMemorizer.getinstances(): m.dump()
[docs]class BookmarkMacroBase(object): """ Base class for bookmark macros, which parses arguments into a list of MotorBookmark objects. If none are given, all bookmarks are included. """ def __init__(self, *args): if len(args) == 0: self.bookmarks = bookmark_refs.copy() self.specific = False return self.specific = True self.bookmarks = [] for name in args: for b in bookmark_refs: if b.name == name: self.bookmarks.append(b)
[docs]@macro class LsBook(BookmarkMacroBase): """ Lists the currently defined bookmarks or the contents of a specific bookmark. :: lsbook [<bookmark name>] """ def run(self): if self.specific: dct = {m.name: (p * m._scaling + m._offset) for m, p in self.bookmarks[0].dct.items()} print(utils.dict_to_table(dct, titles=('motor', 'user pos.'), sort=True)) else: dct = {b.name: ', '.join([k.name for k in b.dct.keys()]) for b in bookmark_refs} print(utils.dict_to_table(dct, titles=('name', 'members'), sort=True))
[docs]@macro class Bookmark(object): """ Bookmarks the specified motors at their current dial positions. :: bookmark <'bookmark name'> <motor 1> <motor 2> ... Also saves existing bookmarks to all available ``MotorMemorizer`` objects. """ def __init__(self, name, *args): self.name = name self.motors = args expect_motors(self.motors) def run(self): existing = [b for b in bookmark_refs if b.name == self.name] [bookmark_refs.remove(b) for b in existing] bookmark_refs.append(MotorBookmark( name=self.name, motors=self.motors, positions=[m.dial_position for m in self.motors])) # memorize the new state for m in MotorMemorizer.getinstances(): m.dump()
[docs]@macro class Restore(BookmarkMacroBase): """ Restore a bookmarked position by moving all motors there. """ def run(self): if not self.specific: return dct = {m: (p * m._scaling + m._offset) for m, p in self.bookmarks[0].dct.items()} args = [val for pair in dct.items() for val in pair] umv = Umv(*args) umv.run()
[docs]@macro class RmBook(BookmarkMacroBase): """ Delete one or all bookmarks defined with the bookmark command. :: rmbook [<bookmark 1> <bookmark 2> ...] Also updates all available ``MotorMemorizer`` instances. """ def run(self): if not self.specific: msg = 'Are you sure you want to remove all bookmarks? [y/n] ' if input(msg).lower() != 'y': return for b in self.bookmarks: bookmark_refs.remove(b) # memorize the new state for m in MotorMemorizer.getinstances(): m.dump()