Source code for contrast.detectors.AlbaEM

"""
This module contains an interface to the Alba electrometer, as well as
a contrast Detector class representing it.

Requires the "STREAMING" mode now available as standard firmware.

For the old school data-polling version, see LegacyAlbaEM.py.
"""

if __name__ == '__main__':
    from contrast.detectors import (Detector, LiveDetector, TriggeredDetector,
                                    BurstDetector)
else:
    from .Detector import (Detector, LiveDetector, TriggeredDetector,
                           BurstDetector)
import telnetlib
import numpy as np
import time
import select
from threading import Thread
import zmq
import json

BUF_SIZE = 1024
NUM_CHAN = 4
TIMEOUT = 5
STREAM_PORT = 22003
CMD_PORT = 5025
VALID_RANGE_STRINGS = [
    '1mA', '100nA', '10nA', '1nA', '100uA', '10uA', '1uA', '100pA']
VALID_FILTER_STRINGS = ['3200Hz', '100Hz', '10Hz', '1Hz', '0.5Hz']
BUSY_STATES = ('STATE_RUNNING', 'STATE_ACQUIRING')
IDLE_STATES = ('STATE_ON', )


[docs]class Stream(Thread): """ Server which receives the electrometer stream. """ def __init__(self, host, port, debug=False): super().__init__() context = zmq.Context() self.sock = context.socket(zmq.PULL) self.sock.set_hwm(10000000) self.sock.connect('tcp://%s:%s' % (host, port)) self.data = [] self.do_debug = debug def debug(self, *args): if self.do_debug: print(*args) def run(self): keys = ['timestamp'] + ['CHAN%02u' % (i + 1) for i in range(NUM_CHAN)] while True: msg = json.loads(self.sock.recv()) if msg['message_type'] == 'series-start': self.data.clear() elif msg['message_type'] == 'data': self.data.append([msg[key] for key in keys])
[docs]class Electrometer(object): """ Interface to a 4-channel Alba electrometer. """ def __init__(self, host='b-nanomax-em2-2', port=CMD_PORT, trig_source='DIO_1', streaming=False, stream_host=None, stream_port=STREAM_PORT): try: self.em = telnetlib.Telnet(host, port, timeout=5) except telnetlib.socket.timeout: raise Exception('Could not connect to %s:%d' % (host, port)) self.query('ACQU:MODE STREAMING') # the DIO channel used for triggering: self._trig_source = trig_source # require SW version 2.2.02 where zmq streaming is available, # below 2.0.04 soft triggers were broken and below 2.0.0 data # indexing was wrong. assert self.version >= (2, 2, 2), \ "Requires on-board SW version 2.2.02 or higher." if streaming: self.stream = Stream(host=host, port=stream_port, debug=False) self.stream.start() else: self.stream = None def _flush(self): return self.em.read_eager().strip().decode('utf-8')
[docs] def query(self, cmd): """ Issue a command and read the answer. """ # ctrl-c during a previous query can have left stuff to be read: self._flush() cmd = (cmd + '\n').encode('utf-8') self.em.write(cmd) ok = False while not ok: reply = self.em.read_until( b'\r\n', timeout=TIMEOUT).strip().decode('utf-8') if reply: ok = True else: print('AlbaEM failed to respond in %.1f s. Trying again...' % TIMEOUT) return reply
def get_instant_current(self, ch): return float(self.query('CHAN%02u:INSC?' % ch)) def get_current_range(self, ch): return self.query('CHAN%02u:CABO:RANG?' % ch) def set_current_range(self, ch, val): if val not in VALID_RANGE_STRINGS: return self.query('CHAN%02u:CABO:RANG %s' % (ch, val)) def get_autorange(self, ch): lookup = {'On': True, 'Off': False} return lookup[self.query('CHAN%02u:CABO:ARNG?' % ch)] def set_autorange(self, ch, val): val = {True: 'On', False: 'Off'}[bool(val)] self.query('CHAN%02u:CABO:ARNG %s' % (ch, val)) def get_filter(self, ch): return self.query('CHAN%02u:CABO:FILT?' % ch) def set_filter(self, ch, val): if val not in VALID_FILTER_STRINGS: return self.query('CHAN%02u:CABO:FILT %s' % (ch, val)) def state(self): st = self.query('ACQU:STAT?') return st def stop(self): self.query('ACQU:STOP True') def status(self): return self.query('ACQU:STUS?') def get_acqtime(self): return float(self.query('ACQU:TIME?')) * 1e-3 def set_acqtime(self, val): val = val * 1000 # ms val = max(val, 0.320) self.query('ACQU:TIME %f' % val)
[docs] def burst(self, period=1., n=1, latency=320e-6): """ Take a series of measurements with internal timing. No triggering is possible, the series starts immediately. """ latency = max(latency, .320e-3) acqtime = period - latency self.query('ACQU:TIME %f' % (acqtime * 1000)) self.query('ACQU:LOWT %f' % (latency * 1000)) self.query('TRIG:MODE AUTOTRIGGER') self.query('ACQU:NTRI %u' % n) self.query('ACQU:STAR')
[docs] def arm(self, acqtime=1., n=1, hw=False): """ Prepare for hw- or sw-triggered acquisition, n x acqtime. """ acqtime = acqtime * 1000 # ms acqtime = max(acqtime, 0.320) self.query('ACQU:TIME %f' % acqtime) self.query('TRIG:MODE %s' % ('HARDWARE' if hw else 'SOFTWARE')) self.query('TRIG:INPU %s' % self._trig_source) self.query('TRIG:DELA 0.0') # no delay self.query('ACQU:NTRI %u' % n) self.query('ACQU:STAR')
def soft_trigger(self): old = int(self.query('ACQU:NDAT?')) ret = self.query('TRIG:SWSE True') # there's a problem with missing soft triggers in fast buffer # mode, but this seems to do it. NDAT cannot be used to check # if an acquisition is ready, but waiting for the number to # go up seems to ensure that the device will not miss any more # commands. while int(self.query('ACQU:NDAT?')) == old: time.sleep(.001) assert ret == 'ACK' @property def ndata(self): """ ACQU:NDAT? increments before the integration time is over, so cannot be used. Look at the stream instead. """ if self.stream is not None: return len(self.stream.data) else: return None @property def version(self): res = self.query('*IDN?') version = res.split(',')[-1].strip() return tuple(map(int, version.split('.')[:3]))
[docs] def test_soft_triggers(self, N=1000): """ The soft triggering wasn't reliable up until SW version 2.0.0, and seems to have started working from 2.0.04. This procedure typically halted after a few 100 points. """ self.arm(n=N, acqtime=.001) for i in range(N): print('starting loop #%u' % (i + 1)) n = self.ndata while n < i: print(' only have %u, trying again' % (n,)) time.sleep(.01) n = self.ndata print(' have %u, issuing trigger #%u' % (n, i + 1)) t0 = time.time() self.soft_trigger() print('soft trigger took %.1f ms' % ((time.time() - t0) * 1000))
[docs]class AlbaEM(Detector, LiveDetector, TriggeredDetector, BurstDetector): """ Contrast interface to the alba EM. The specifics of the EM enables these four cases, each of which causes a different triggering and readout behaviour below: 1) HW triggered expecting one trigger per SW step -> arm at the top 2) HW triggered expecting hw_trig_n triggers per SW step -> arm on every sw step 3) Burst mode, burst_n > 1, uses a special EM command 4) Software triggered mode, -> arm at the top Note that the electrometer itself (as of SW version 2.0.04) does not allow for triggered burst acquisition, as reflected in the code. """ def __init__(self, name=None, debug=False, **kwargs): self.kwargs = kwargs self.do_debug = debug Detector.__init__(self, name=name) LiveDetector.__init__(self) TriggeredDetector.__init__(self) BurstDetector.__init__(self) def initialize(self): self.em = Electrometer(stream_port=STREAM_PORT, streaming=True, **self.kwargs) self.burst_latency = 320e-6 self.n_started = 0 self.channels = [ch + 1 for ch in range(NUM_CHAN)] def prepare(self, acqtime, dataid, n_starts): BurstDetector.prepare(self, acqtime, dataid, n_starts) self.n_started = 0 self.n_starts = n_starts if self.busy(): raise Exception('%s is busy!' % self.name) msg = ( "The Alba EM cannot handle hardware-triggered burst acquisitions!") assert not (self.hw_trig and (self.burst_n > 1)), msg if self.global_arm: self.armed_so_far = 0 self.em.arm(self.acqtime, self.n_starts, self.hw_trig) @property def global_arm(self): # This checks whether to arm the EM for several SW starts. # This corresponds to cases 1 and 4 (above). return ((self.hw_trig and self.hw_trig_n == 1) or (not self.hw_trig and self.burst_n == 1)) def start_live(self, acqtime=1.0): # The Alba EM:s are always in live mode, exposing the "instant # current" values. pass def stop_live(self): # Nothing to do... pass def arm(self): if (self.hw_trig and self.hw_trig_n > 1): self.em.arm(self.acqtime, self.hw_trig_n, True) def start(self): self.n_started += 1 if self.hw_trig: return elif self.burst_n > 1: period = self.acqtime + self.burst_latency self.em.burst(period, self.burst_n, self.burst_latency) else: self.em.soft_trigger() def stop(self): self.em.stop() while self.busy(): time.sleep(.01) def busy(self): st = self.em.state() if st in IDLE_STATES: return False if self.global_arm: # case 1 or 4 (above): expect one point per start on the stream return (self.em.ndata < self.n_started) else: # case 2 or 3 (see above): require an idle status return st in BUSY_STATES assert(False), "Should never get here!" def read(self): chs = self.channels keys = ['t', ] + self.channels if self.global_arm: # case 1 or 4, which means read a single point each time data = np.array(self.em.stream.data[-1]) return {k: v for k, v in zip(keys, data)} else: # case 2 or 3, return the last burst_n or hw_trig_n points chunk = self.hw_trig_n if self.hw_trig else self.burst_n ret = {} data = np.array(self.em.stream.data) if not (len(self.em.stream.data) == chunk): # data missing, don't do a hard assert, pad and warn loudly. missing = chunk - len(self.em.stream.data) data = np.pad(data, pad_width=((0, missing), (0, 0))) print('*** WARNING! wrong number of data points received ' 'from %s. Padding with zeros, electrometer data will ' 'be SCREWED UP ***' % self.name) for i in range(len(keys)): ret[keys[i]] = data[:, i].reshape((1, -1)) return ret
if __name__ == '__main__': # Example usage of the bare Electrometer class. Currents in uA. em = Electrometer(host='b-nanomax-em2-0') em.burst(period=.001, n=60000) # 1 minute, 1 kHz while em.ndata < 60000: print('Now have %u points' % len(em.stream.data)) time.sleep(.1)