Source code for contrast.recorders.Recorder
from ..Gadget import Gadget
from .. import utils
from ..environment import macro
import time
import signal
from multiprocessing import get_context
# Fancy multiprocessing contexts needed or we will crash matplotlib
# and ipython somehow. On the other hand, 'spawn' is more sensitive
# than 'fork' so you have to be careful only to call Recorder.start()
# from main, see the example scripts.
ctx = get_context('spawn')
Queue = ctx.Queue
[docs]class Process(ctx.Process):
""" Dummy for cleaning up the inheritance documentation. """
pass
[docs]class Recorder(Gadget, Process):
"""
Base class for Recorders. Provides the multiprocessing and queuing
functionality.
"""
def __init__(self, delay=.1, **kwargs):
"""
:param delay: Sleep time for the queue checking loop.
:type delay: float
:param ``**kwargs``: Passed on to base class constructor
"""
Process.__init__(self)
Gadget.__init__(self, **kwargs)
ctx = get_context('spawn')
self.queue = ctx.Queue()
self.delay = delay
self.quit = False
def _process_queue(self):
dcts = []
while not self.queue.empty():
# ok since only we are reading from self.queue:
dcts.append(self.queue.get())
for dct in dcts:
if dct is None:
self.quit = True
elif isinstance(dct, RecorderHeader):
self.act_on_header(dct)
elif isinstance(dct, RecorderFooter):
self.act_on_footer(dct)
else:
self.act_on_data(dct)
[docs] def run(self):
"""
The main loop of the recorder.
"""
# ignore SIGINT signals from ctrl-C in the main process
signal.signal(signal.SIGINT, signal.SIG_IGN)
self.init()
while not self.quit:
time.sleep(self.delay)
self._process_queue()
self.periodic_check()
self._close()
[docs] def init(self):
"""
*Override this.* Use this method to Initialize the recorder
(open windows, open files, etc.).
"""
pass
[docs] def act_on_data(self, dct):
"""
*Override this.* Performs an action when a new data package is
received. The keys of dct are detector and motor names, the
values are their readings at one particular point in the scan.
"""
pass
[docs] def periodic_check(self):
"""
A function which gets called on every iteration of the recorder.
Useful for example for checking if files should be closed or
whether a plot window still exists.
"""
def _close(self):
"""
*Override this.* Clean up (close windows, close files...)
"""
pass
[docs] def stop(self):
"""
Stop a started subprocess safely by putting a poison pill in
its queue.
"""
self.queue.put(None)
[docs]class DummyRecorder(Recorder):
"""
Dummy recorder for practise.
"""
def act_on_header(self, dct):
print('got a header! am told to write scan %d to %s'
% (dct['scannr'], dct['path']))
def act_on_data(self, dct):
print('found this!', dct)
def act_on_footer(self, dct):
print('scan %d at %s over, apparently' % (dct['scannr'], dct['path']))
def init(self):
print('opening')
def _close(self):
print('closing')
[docs]def active_recorders():
"""
Utilify function which returns a list of currently running
``Recorder`` objects.
"""
return [r for r in Recorder.getinstances() if r.is_alive()]
[docs]@macro
class LsRec(object):
"""
List active recorders.
"""
def run(self):
dct = {r.name: r.__class__ for r in active_recorders()}
print(utils.dict_to_table(dct, titles=('name', 'class')))