Source code for contrast.detectors.PandaBox
from .Detector import Detector, TriggeredDetector, BurstDetector
from ..environment import env
import time
import numpy as np
import socket
from threading import Thread
import select
SOCK_RECV = 4096
RECV_DELAY = 1e-4
[docs]class PandaBox(Detector, TriggeredDetector, BurstDetector):
"""
Basic class for treating a PandaBox as a detector, capable
of operating in burst mode (thereby acting as a time-based
trigger source).
The PandaBox is infinitely configurable, and this class assumes that:
#. the PCAP block is used,
#. the PULSE1 block is used to control the number of
acquired points and their timing, and
#. flickering the "A" bit causes a trigger.
"""
def __init__(self, name=None, host='172.16.126.101',
ctrl_port=8888, data_port=8889, bitblock='BITS1',
debug=False):
self.host = host
self.ctrl_port = ctrl_port
self.data_port = data_port
self.acqthread = None
self.burst_latency = .003
self.bitblock = bitblock
self.debug = debug
Detector.__init__(self, name=name)
TriggeredDetector.__init__(self)
BurstDetector.__init__(self)
def initialize(self):
self.ctrl_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.ctrl_sock.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
self.ctrl_sock.settimeout(1)
self.ctrl_sock.connect((self.host, self.ctrl_port))
def query(self, cmd):
if self.debug:
print('*** sending:', cmd)
self.ctrl_sock.sendall(bytes(cmd + '\n', 'ascii'))
ret = self.ctrl_sock.recv(SOCK_RECV).decode()
if self.debug:
print('### got:', ret)
return ret
def busy(self):
if self.acqthread and self.acqthread.is_alive():
return True
else:
return False
def prepare(self, acqtime, dataid, n_starts):
BurstDetector.prepare(self, acqtime, dataid, n_starts)
self.query('PULSE1.PULSES=%d' % self.burst_n)
self.query('PULSE1.WIDTH=%f' % self.acqtime)
self.query('PULSE1.STEP=%f' % (self.burst_latency + self.acqtime))
def arm(self):
self.acqthread = Thread(target=self._acquire)
self.acqthread.start()
armed = False
# since early 2022, we get stuck here because the panda doesn't
# take the arm. never happened before, happens all the time now.
while not armed:
ret = self.query('*PCAP.ARM=')
if 'OK' in ret:
armed = True
else:
print('failed to arm %s (trying again): "%s"' % (self.name, ret[:-1]))
ready = False
while not ready:
ret = self.query('*PCAP.COMPLETION?')
if 'Busy' in ret:
ready = True
time.sleep(.005)
# time.sleep(.05) # necessary hack
def _acquire(self):
"""
Receive and parse one set of measurements.
"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((self.host, self.data_port))
s.send(b'\n\n')
# First wait for the header to be complete, and parse it.
done = False
buff = b''
while not done:
buff += s.recv(SOCK_RECV)
if b'\n\n' in buff:
done = True
header, buff = buff.split(b'\n\n')
channels = []
for line in header.split(b'fields:\n')[-1].split(b'\n'):
ch = line.strip().split()[0].decode()
op = line.strip().split()[2].decode()
channels.append(ch + '_' + op)
# Then put the rest of the data into the same buffer and continue
n = 0
data = {ch: [] for ch in channels}
if self.hw_trig:
num_points = self.hw_trig_n * self.burst_n
else:
num_points = self.burst_n
while n < num_points:
# anything more to read?
ready = select.select([s], [], [], RECV_DELAY)[0]
if ready:
buff += s.recv(SOCK_RECV)
# anything more to parse?
if b'\n' in buff:
line, buff = buff.split(b'\n', 1)
vals = line.strip().split()
for k, v in zip(channels, vals):
if b'END' in v:
data[k].append(None)
n = num_points
break
data[k].append(float(v))
n += 1
for k, v in data.items():
data[k] = np.array(v)
self.data = data
self.query('*PCAP.DISARM=')
def start(self):
if not self.hw_trig:
self.query('%s.A=1' % self.bitblock)
time.sleep(0.001)
self.query('%s.A=0' % self.bitblock)
def stop(self):
self.query('*PCAP.DISARM=')
def read(self):
return self.data