from Ultracam import * import logging class dcs: """ Class which provides an interface to the DAZLE detector control system. """ def __init__(self, camera_url, filesave_url, name='dcs'): """ Arguments: cameraURL: URL of the camera task on the dectector control computer e.g. 'optics26:7063' filesaveURL: URL of the filesave task on the detector control computer e.g. 'optics26:5417' name: Indentifier used for logging purposes. """ self.name = name self.logger = logging.getLogger(name) # Create an Ultracam object and initialise it. self.logger.info('Connecting to DCS...') self.uc = Ultracam(camera_url, filesave_url) if self.uc.initialise(): self.logger.info('DCS initialised.') else: self.logger.critical('Failed to initialise DCS!') # Do we need to RCO and RST too? Test scripts don't. self.RCO() self.RST() # Check that everything is OK. cstate, cstatus, fstate, fstatus = self.uc.getStatus() if cstatus == 'OK' and fstatus == 'OK': self.logger.debug('DCS status: %s %s %s %s' % \ (cstate, cstatus, fstate, fstatus)) else: self.logger.warning('DCS status: %s %s %s %s' % \ (cstate, cstatus, fstate, fstatus)) def getStatus(self): """ Queries the state and status of the camera and filesave processes, and returns a tuple of (cstate, cstatus, fstate, fstatus). """ return self.uc.getStatus() def powerOn(self): """ Powers on the detector. """ self.logger.debug('powerOn() called.') # Load the relevant app. pon = self.uc.getApplication('dazle_pon_app.xml') # Execute it. state, status = self.uc.execute(pon) # Verify it worked. self.uc.wait_for_idle() if state == 'IDLE' and status == 'OK': self.logger.debug('Detector powered on.') else: self.logger.critical('Power on failed, state: %s, status: %s' % \ (state, status)) def powerOff(self): """ Powers off the detector. """ self.logger.debug('powerOff() called.') # Load the relevant app. pof = self.uc.getApplication('dazle_pof_app.xml') # Execute it. state, status = self.uc.execute(pof) # Verify it worked. self.uc.wait_for_idle() if state == 'IDLE' and status == 'OK': self.logger.debug('Detector powered off.') else: self.logger.critical('Power off failed, state: %s, status: %s' % \ (state, status)) def RCO(self): """ Executes the 'RCO' command, whatever that does... """ state, status = self.uc.RCO() if status == 'OK': self.logger.debug('RCO successful.') else: self.logger.critical('RCO failed, state: %s, status: %s' % \ (state, status)) def RST(self): """ Executes the 'RST' command, whatever that does... """ state, status = self.uc.RST() if status == 'OK': self.logger.debug('RST successful.') else: self.logger.critical('RST failed, state: %s, status: %s' % \ (state, status)) def dummy(self, DWELL=1, NUM_EXPS=1): """ Executes the dummy SRR application written by the ATC to generate a frame of fake data. """ self.logger.debug('dummy(DWELL=%i, NUM_EXPS=%i) called.' \ % (DWELL, NUM_EXPS)) self.__doExposure('dazle_dummy_app.xml', DWELL, NUM_EXPS) def SRR(self, DWELL=1, NUM_EXPS=1): """ Executes the SRR application to take a single raw read frame (or take multiple single raw reads and average them, if NUM_EXPS!=1). """ self.logger.debug('SRR(DWELL=%i, NUM_EXPS%i) called.' \ % (DWELL, NUM_EXPS)) self.__doExposure('dazle_srr_app.xml', DWELL, NUM_EXPS) def CDS(self, DWELL=1, NUM_EXPS=1): """ Executes the CDS application to take a correllated double sampling exposure (or take multiple CDS exposures and average them, if NUM_EXPS!=1). """ self.logger.debug('CDS(DWELL=%i, NUM_EXPS=%i) called.' \ % (DWELL, NUM_EXPS)) self.__doExposure('dazle_cds_app.xml', DWELL, NUM_EXPS) def NDR(self, DWELL, NUM_READ, NUM_EXPS): """ Executes the 'sample up the ramp' non-destructive read application (or do this multiple times and average the result is NUM_EXPS!=1). """ self.logger.debug('NDR(DWELL=%i, NUM_READ=%i, NUM_EXPS=%i) called.' \ % (DWELL, NUM_READ, NUM_EXPS)) self.__doExposure('dazle_ndr_app.xml', DWELL, NUM_EXPS, NUM_READ) def __doExposure(self, appname, DWELL, NUM_EXPS, NUM_READ=None): app = self.uc.getApplication(appname) if not app.setparam('DWELL', DWELL): self.logger.warning("Couldn't set DWELL!") if not app.setparam('NUM_EXPS', NUM_EXPS): self.logger.warning("Couldn't set NUM_EXPS!") if NUM_READ and not app.setparam('NUM_READ', NUM_READ): self.logger.warning("Couldn't set NUM_READ!") self.uc.wait_for_idle() cstate, cstatus = self.uc.execute(app) fstate, fstatus, filename = self.uc.getFStatus() if cstatus == 'OK' and fstatus == 'OK': self.logger.debug(\ 'cstate: %s, cstatus: %s, fstate: %s, fstatus: %s.' % \ (cstate, cstatus, fstate, fstatus)) self.logger.info('Wrote data to %s.*.' % filename) else: self.logger.critical(\ 'Problem taking exposure, ' + \ 'cstate: %s, cstatus: %s, fstate: %s, fstatus: %s, file: %s.' \ % (cstate, cstatus, fstate, fstatus, file))