""" rotator module for dazle """
"""
High level commands are:
rotator.rotator.where()
"""
import ConfigParserRT
import datetime
import logging
import math
import slalib
import serial
import threading
import time
debug=True
verbose=True
class Error(Exception):
"""Base class for exceptions in this module."""
pass
class UnitsError(Error):
"""
Raised where invalid units are specified.
"""
def __init__(self, function, unit):
self.function = function
self.unit = unit
def __repr__():
return "rotator.UnitsError('%s', '%s')" % (self.function, self.unit)
def __str__():
return "Invalid units '%s' specified in call to %s" % \
(self.unit, self.function)
class ParseError(Error):
"""
Raised when problems occur when parsing strings with slalib routines.
"""
def __init__(self, function, status, startPos, endPos):
self.function = function
self.status = status
self.startPos = startPos
self.endPos = endPos
class SlalibError(Error):
"""
Raised when an error code is returned by a slalib routine.
"""
def __init__(function, status):
self.function = function
self.status = status
class tracker(threading.Thread):
"""
Worker thread intended to be used by an instance of rotator.rotator in
order to perform the operations required for rotator tracking in the
background.
"""
def __init__(self, tele, tgt, dt, low, high, port, stop, lockedOn, logger, \
logData=False):
"""
Arguments:
tele: A rotator.telescope object describing the observatory at
which the instrument is situated.
tgt: A rotator.target object representing the target field
dt: The interval at which tracking adjustments should be made,
in seconds.
low: The lower limit of allowed motion, a rotator.angle object.
high: The upper limit of allowed motion, a rotator.angle object.
port: A serial.Serial object for communication with the hardware.
stop: A threading.Event object used to terminate the tracking
loop.
lockedOn: A threading.Event object used to indicate tracking within
tolerances.
logger: A logging.Logger instance used to output information.
logData: Boolean to indicate whether tracking status information
should be logged to the file 'dicstrack.log'. Default False.
"""
if isinstance(tele, telescope):
self.__telescope = tele
else:
raise TypeError('rotator.tracker() expected rotator.telescope,' \
+ ' got %s' % type(tele))
if isinstance(tgt, target):
self.__target = tgt
else:
raise TypeError('rotator.tracker() expected rotator.target,' \
+ ' got %s' % type(tgt))
if hasattr(stop, 'clear'):
self.__stop = stop
else:
raise TypeError('rotator.tracker() expected threading.Event,' \
+ ' got %s' % type(stop))
if hasattr(lockedOn, 'clear'):
self.lockedOn = lockedOn
else:
raise TypeError('rotator.tracker() expected threading.Event,' \
+ ' got %s.' % type(stop))
if isinstance(logger, logging.Logger):
self.logger = logger
else:
raise TypeError('rotator.tracker() expected logging.Logger,' \
+ ' got %s' % type(logger))
self.__dt = float(dt)
if isinstance(low, angle):
self.__lower = low
else:
raise TypeError('rotator.tracker() expected rotator.angle,' \
+ ' got %s' % type(low))
if isinstance(high, angle):
self.__upper = high
else:
raise TypeError('rotator.tracker() expected rotator.angle,' \
+ ' got %s' % type(high))
if isinstance(port, serial.Serial):
self.__port = port
else:
raise TypeError('rotator.tracker() expected serial.Serial,' \
+ ' got %s' % type(port))
self.__logData = logData
if logData:
try:
self.__trackLogFile = file('dicstrack.log', 'w')
except IOError:
self.logger.warning(\
"Couldn't open track log file 'dicstrack.log")
self.__logData = False
self.__tolerance = angle(2E-4)
threading.Thread.__init__(self)
def run(self):
"""
Don't call this directly, use start().
"""
self.logger.info('Beginning to track target %s' % self.__target.name)
self.__telescope.resetField()
max_time = datetime.datetime.fromtimestamp(0.0)
for i in range(0, 4):
lt = self.limit_time()
if lt and lt > max_time:
max_time = lt
index = i
self.__target.unwind(1)
self.__target.unwind(index)
self.logger.info('Using position angle of %f' % \
self.__target.getPA().getAngle('d'))
self.logger.info('Unwind needed by %s UTC.' % \
max_time.strftime('%H:%M:%S'))
warned10mins = False
warned5mins = False
lockOnCount = 0
self.__port.timeout = 5
self.__port.flushInput()
count = 0
start_time = time.time()
utc = datetime.datetime.utcfromtimestamp(start_time + self.__dt)
firstpos, az, zd, pa, HAObs, decObs, RAObs = self.calcRotPos(utc)
self.logger.info('Initial telescope pointing: Az %f, zd %f.' % \
(az.getAngle('d'), zd.getAngle('d')))
self.logger.info('Initial parallactic angle: %f.' % pa.getAngle('d'))
self.logger.info('Initial observed hour angle: %f.' % \
HAObs.getAngle('h'))
self.logger.info('Target observed place: RA %s, dec %s.' % \
(RAObs.getString('h'), decObs.getString('d')))
if zd > angle(75.0, 'd'):
self.logger.critical('Target too low to observe! Not tracking.')
return
self.logger.info('Initial rotator position: %f.' % \
firstpos.getAngle('d'))
self.__port.write('T')
self.__port.write('%e\n' % self.__dt)
self.__port.write('%e\n' % firstpos.getAngle())
while not self.__stop.isSet():
if self.__logData:
self.__trackLogFile.write(utc.isoformat() + \
'\t%f\t%f\t%f\t%f\t%f\t%f\t' % \
(az.getAngle('d'), zd.getAngle('d'),\
pa.getAngle('d'), \
HAObs.getAngle('h'), \
decObs.getAngle('d'), \
RAObs.getAngle('h')))
count += 1
next_time = start_time + self.__dt * count
utc = datetime.datetime.utcfromtimestamp(next_time + self.__dt)
if not warned10mins and \
max_time - utc < datetime.timedelta(minutes=10):
self.logger.warning('10 minutes to rotator limit.')
warned10mins = True
if not warned5mins and \
max_time - utc < datetime.timedelta(minutes=5):
self.logger.warning('5 minutes to rotator limit.')
warned5mins = True
if self.__logData:
next_pos, az, zd, pa, HAObs, decObs, RAObs = \
self.calcRotPos(utc)
else:
next_pos = self.calcRotPos(utc)[0]
if next_pos <= self.__lower:
self.logger.critical('Reached lower limit of rotator travel!')
break
elif next_pos >= self.__upper:
self.logger.critical('Reached upper limit of rotator travel!')
break
self.__port.write('%e\n' % next_pos.getAngle())
delay = next_time - time.time()
if delay > 0:
time.sleep(delay)
self.__port.write('M')
response = self.__port.readline()
if response == 'O\n':
position = angle(float(self.__port.readline()))
error = angle(float(self.__port.readline()))
if self.__logData:
self.__trackLogFile.write('%f\t%f\n' % \
(position.getAngle('d'),
error.getAngle('d')))
elif response == 'U\n':
self.logger.critical('Position rejected as outside upper' + \
'limit by Taranis!')
self.logger.critical('Position sent was %f.' % \
next_pos.getAngle('d'))
self.logger.critical('Upper limit is %f.' % \
self.__upper.getAngle('d'))
break
elif response == 'L\n':
self.logger.critical('Position rejected as outside lower' + \
'limit by Taranis!')
self.logger.critical('Position sent was %f.' % \
next_pos.getAngle('d'))
self.logger.critical('Lower limit is %f.' % \
self.__lower.getAngle('d'))
break
elif response == 'u\n':
self.logger.critical('Rotator moved outside upper limit!')
break
elif response == 'l\n':
self.logger.critical('Rotator moved outside lower limit!')
break
elif response == '1\n':
self.logger.critical('Rotator hit lower limit switch!')
break
elif response == '2\n':
self.logger.critical('Rotator hit upper limit switch!')
break
else:
self.logger.critical("Unexpected response '%s' from Taranis!"\
% response)
break
if not self.lockedOn.isSet() and abs(error) < self.__tolerance:
if lockOnCount < 10:
lockOnCount += 1
else:
self.lockedOn.set()
self.logger.info('Tracking within tolerance.')
if self.lockedOn.isSet() and abs(error) > self.__tolerance:
lockOnCount = 0
self.lockedOn.clear()
self.logger.warning('Tracking tolerance exceeded.')
self.__port.write('0.0\n')
self.__port.write('C')
self.lockedOn.clear()
if self.__logData:
self.__trackLogFile.close()
self.logger.info('Tracking stopped')
az, zd = self.__telescope.getPointing(self.__target, utc)[:2]
self.logger.info('Final telescope pointing: Az %f, zd %f.' % \
(az.getAngle('d'), zd.getAngle('d')))
self.logger.info('Final rotator position: %f.' % \
next_pos.getAngle('d'))
def calcRotPos(self, utc):
"""
Based on the currently set target and telescope calculates the
required rotator position for the UTC given as a datetime.datetime
object.
DAzLE will mount on VLT UT3 Nasmyth platform 'A', which is on the
LEFT hand side when looking in the direction in which the telescope
if pointing. For the left hand Nasmyth platform the position angle
of the sky image on the detector with the rotator at zero degrees is
equal to parallactic angle + zenith distance + 180 degrees. The
required rotator position is therefore requested PA - parallactic angle
- zenith distance - 180degrees, appropriately normalised into -180 to
+180.
"""
az, zd, pa, HAobs, decObs, RAobs = \
self.__telescope.getPointing(self.__target, utc)
rotpos = self.__target.getPA() - pa - zd - angle(180, 'd')
rotpos.normPi()
return rotpos, az, zd, pa, HAobs, decObs, RAobs
def limit_time(self):
"""
Calculates the time (UTC) at which the rotator will reach the limits
of its range of movement for the set telescope and target. Returns
None if the the rotator position is currently outside the limits,
otherwise a datetime.datetime object.
"""
step = datetime.timedelta(seconds=300)
accuracy = datetime.timedelta(seconds=1)
utc1 = datetime.datetime.utcnow()
pos1 = self.calcRotPos(utc1)[0]
if pos1 > self.__lower and pos1 < self.__upper:
while pos1 > self.__lower and pos1 < self.__upper:
utc0 = utc1
pos0 = pos1
utc1 = utc1 + step
pos1 = self.calcRotPos(utc1)[0]
if pos1 < self.__lower:
limit = self.__lower
elif pos1 > self.__upper:
limit = self.__upper
delta_t = step
while abs(delta_t) > accuracy:
factor = (pos1-limit).getAngle() / (pos1 - pos0).getAngle()
delta_t = - datetime.timedelta(delta_t.days * factor, \
delta_t.seconds * factor, \
delta_t.microseconds * factor)
utc = utc1 + delta_t
pos = self.calcRotPos(utc)[0]
if pos0 < pos < limit or pos0 > pos > limit:
pos0 = pos1
utc0 = utc0
pos1 = pos
utc1 = utc
elif limit < pos < pos1 or limit > pos > pos1:
pos1 = pos
utc1 = utc
return utc1
class rotator:
"""
Class representing the camera rotation system used by DAZLE for field
rotation comepensation.
"""
def __init__(self, portname, telescope, target, idle, run, boost, \
name='rotator'):
"""
Initialiser for rotator class.
Arguments:
portname: The device name of the serial port which the
Taranis stepper motor controller is connected to.
telescope: A rotator.telescope object containing observatory
specific information.
target: A rotator.target object contain details of the target
field for observations.
idle: The peak motor current when the motor is stationary,
in amps. Must be >=0 and <= run.
run: The peak motor current when the motor is moving
continously, in amps. Must be >= idle and <= boost.
boost: The peak motor current when the motor is accelerating/
decelerating, in amps. Must be >=run and <= 4.5.
dt: The interval between tracking corrections, in seconds.
name: label used for logging purposes.
"""
self.__portname = portname
self.__telescope = telescope
self.__target = target
self.__idle = float(idle)
self.__run = float(run)
self.__boost = float(boost)
self.__dt = float(dt)
self.name = name
self.logger = logging.getLogger(name)
try:
self.__port = serial.Serial(portname, timeout=5)
self.__port.open()
self.__port.flushInput()
except serial.SerialException:
self.logger.critical('Failed to open serial port %s!' % portname)
return
self.logger.info('Connected to serial port %s' % portname)
self.where()
self.setCurrents(self.__idle, self.__run, self.__boost)
self.__stop = threading.Event()
self.__stop.set()
self.lockedOn = threading.Event()
self.lockedOn.clear()
def updateConfig(self, config):
"""
Should be called by the parent instrument object's destructor
to enable status information to be saved to file on exit.
Takes a ConfigParserRT.SafeConfigParser object as argument.
"""
self.logger.debug("updateConfig() called")
self.stopTracking()
if isinstance(config, ConfigParserRT.SafeConfigParser):
config.set(self.name, 'target', self.__target.name)
config.set(self.name, 'telescope', self.__telescope.name)
config.set(self.name, 'idle', str(self.__idle))
config.set(self.name, 'run', str(self.__run))
config.set(self.name, 'boost', str(self.__boost))
config.set(self.name, 'dt', str(self.__dt))
else:
self.logger.critical("Invalid argument to updateConfig(%s)" \
% repr(config))
self.logger.critical("Unable to save status data")
def setCurrents(self, idle, run, boost):
"""
Sets the motor currents to be used by the rotator.
Arguments:
idle: The peak motor current when the motor is stationary, in amps.
Must be >=0 and <= run.
run: The peak motor current when the motor is moving continously,
in amps. Must be >= idle and <= boost.
boost: The peak motor current when the motor is accelerating/
decelerating, in amps. Must be >=run and <= 4.5.
"""
self.logger.debug('setCurrents(%f, %f, %f) called.' % \
(idle, run, boost))
if self.isTracking():
self.logger.critical("Currently tracking, can't setCurrents().")
return
self.__port.flushInput()
self.__port.timeout = 5
if boost <= 4.5 and boost >= run and run >= idle and idle >=0:
self.__port.write('S')
self.__port.write('%f\n' % (idle * 1000))
self.__port.write('%f\n' % (run * 1000))
self.__port.write('%f\n' % (boost * 1000))
response = self.__port.readline()
if response == 'O\n':
self.__idle = idle
self.__run = run
self.__boost = boost
self.logger.debug('Motor currents set.')
elif response == 'F\n':
self.logger.critical('Motor currents rejected by Taranis!')
self.logger.critical(\
'Call setCurrents() again before using rotator.')
else:
self.logger.critical('Unexpected response %s from Taranis!' % \
repr(response))
self.logger.critical(\
'Call setCurrents() again before using rotator.')
else:
self.logger.critical('Attempt to set illegal motor currents, ' + \
'%fA, %fA, %fA.' % (idle, run, boost))
self.logger.critical(\
'Call setCurrents() again before using rotator.')
def setupRotator(self):
"""
Instructs the rotator to perform its initialisation routine, which
determines the encoder angular scale and zero point, and sets up
the upper and lower motion limits.
"""
self.logger.debug('setupRotator() called.')
if self.isTracking():
self.logger.critical("Currently tracking, can't setupRotator().")
return
self.__port.timeout = 1200
self.__port.flushInput()
self.__port.write('I')
response = self.__port.readline()
if response == "O\n":
self.logger.info('Rotator initialisation complete.')
self.where()
elif response == "F\n":
self.logger.critical('Rotator initialisation failed!')
self.logger.critical('Check limit switch and motor function')
self.logger.critical('then run again.')
elif not response:
self.logger.critical('No response from Taranis in setupRotator()!')
self.logger.critical('Check serial cable and run ' + \
'setupRotator again.')
else:
self.logger.critical("Unexpected response '%s' from Taranis in" \
% response + 'setupRotator()')
def setTarget(self, field):
"""
Changes to current target field to the rotator.target instance
given as the argument.
"""
if self.isTracking():
self.logger.critical("Currently tracking, can't setTarget().")
return
if isinstance(field, target):
self.__target = field
else:
self.logger.warning(\
'setTarget() expected rotator.target, got %s.' % type(field))
def setTelescope(self, observatory):
"""
Changes the current telescope/observatory for the instrument to the
rotator.telescope instance passed as the srgument
"""
if self.isTracking():
self.logger.critical("Currently tracking, can't setTelescope().")
return
if isinstance(observatory, telescope):
self.__telescope = observatory
else:
self.logger.warning(\
'setTelescope() expected rotator.telescope, for %s' \
% type(telescope))
def startTracking(self, logData=False):
"""
Creates an instance of rotator.tracker thread and sets it running
for the currently set telescope and target.
Arguments:
logData: Boolean to indicate whether tracking status information
should be logged to the file 'dicstrack.log'. Default False.
"""
if self.isTracking():
self.logger.critical('Rotator already tracking, ' + \
'ignoring startTracking()')
return
self.__stop.clear()
self.__tracker = tracker(self.__telescope, self.__target, \
self.__dt, self.__lower, self.__upper, \
self.__port, self.__stop, self.lockedOn, \
self.logger, logData)
self.__tracker.start()
def stopTracking(self):
self.__stop.set()
def isTracking(self):
try:
return self.__tracker.isAlive()
except AttributeError:
return False
def isLockedOn(self):
if self.lockedOn.isSet():
return True
else:
return False
def where(self):
"""
Request status information from the Taranis.
"""
self.logger.debug('rotator.where() called.')
if self.isTracking():
self.logger.critical('Currently tracking, where() ignored.')
return
self.__port.timeout = 5
self.__port.flushInput()
self.__port.write('W')
pos = self.__port.readline()
upper = self.__port.readline()
lower = self.__port.readline()
if pos and upper and lower:
self.__position = angle(float(pos))
self.__upper = angle(float(upper))
self.__lower = angle(float(lower))
self.logger.info('Received %f, %f, %f.' % \
(self.__position.getAngle('d'), \
self.__upper.getAngle('d'), \
self.__lower.getAngle('d')))
else:
self.logger.critical('No response from Taranis during ' + \
'where()!')
self.logger.critical('Check serial cable then run ' + \
'where() again.')
class telescope:
"""
Class representing the telescope/observatory. Stores information relating
to the position and meteorological conditions (for refraction
calculations) and performs calculations of observed azimuth and elevation.
"""
def __init__(self, longitude, latitude, altitude, \
polar_motion_x, polar_motion_y, delta_ut, \
temperature, pressure, humidity, tlr, \
wavelength, name):
"""
Initialiser for a telescope object. Arguments representing angles
should be an object acceptable as an argument to the rotator.angle
constructor, or a sequence type containing a list of arguments
acceptible to rotator.angle().
Arguments:
longitude: Mean longitude of the telescope, east positive.
latitude: Mean geodetic latitude of the telescope.
altitude: Telescope's height above mean sea level (metres).
polar_motion_x: Polar motion x-coordinate for today, from IERS
polar_motion_y: Polar motion y-coordinate for today, from IERS
delta_ut: UT1-UTC from IERS, in seconds, as a float
temperature: Local ambient temperature (Kelvin)
pressure: Local atmospheric pressure (millibars)
humidity: Local relative humidity (in the range 0.0-1.0)
tlr: Tropospheric lapse rate (Kelvin/metre)
wavelength: Effective wavelength of the observations (microns)
name: An identification string for logging purposes.
"""
for varname, arg in \
('_telescope__longitude', longitude), \
('_telescope__latitude', latitude), \
('_telescope__polar_motion_x', polar_motion_x), \
('_telescope__polar_motion_y', polar_motion_y):
try:
setattr(self, varname, angle(*arg))
except TypeError:
setattr(self, varname, angle(arg))
self.__altitude = float(altitude)
self.__delta_UT = float(delta_ut)
self.__temperature = float(temperature)
self.__pressure = float(pressure)
self.__humidity = float(humidity)
self.__tlr = float(tlr)
self.__wavelength = float(wavelength)
self.name = name
self.logger = logging.getLogger(name)
self.__aoprms = slalib.doubleArray(14)
def updateConfig(self, config):
"""
Should be called by the parent instrument object's destructor
to enable status information to be saved to file on exit.
Takes a ConfigParserRT.SafeConfigParser object as argument.
"""
self.logger.debug("updateConfig() called.")
if isinstance(config, ConfigParserRT.SafeConfigParser):
config.set(self.name, 'longitude', self.__longitude.getString('d'))
config.set(self.name, 'latitude', self.__latitude.getString('d'))
config.set(self.name, 'altitude', str(self.__altitude))
config.set(self.name, 'polar_motion_x', \
self.__polar_motion_x.getString('d', 5))
config.set(self.name, 'polar_motion_y', \
self.__polar_motion_y.getString('d', 5))
config.set(self.name, 'delta_ut', str(self.__delta_UT))
config.set(self.name, 'temperature', str(self.__temperature))
config.set(self.name, 'pressure', str(self.__pressure))
config.set(self.name, 'humidity', str(self.__humidity))
config.set(self.name, 'tlr', str(self.__tlr))
config.set(self.name, 'wavelength', str(self.__wavelength))
else:
self.logger.critical("Invalid argument to updateConfig(%s)" \
% repr(config))
self.logger.critical("Unable to save status data")
def getIERS(self):
"""
Returns the stored earth rotation parameters, as provided by the
International Earth Rotation Service, UT1-UTC (seconds, float),
and the x and y components of polar motion (rotator.angles).
"""
return self.__delta_UT, angle(self.__polar_motion_x), \
angle(self.__polar_motion_y)
def setIERS(self, delta_UT, polar_motion_x, polar_motion_y):
"""
Sets the earth rotation parameters. The information to do this
should be obtained from the International Earth Rotation Service
bulletin covering the day in question.
Arguments:
delta_UT: UT1-UTC, in seconds.
polar_motion_x: Polar motion x-coordinate, as either an argument or
sequence containing an argument list for the
rotator.angle() constructor.
polar_motion_y: Polar motion y-coordinate, as either an argument or
sequence containing an argument list for the
rotator.angle() constructor.
"""
self.__delta_UT = float(delta_UT)
for name, arg in \
('_telescope__polar_motion_x', polar_motion_x), \
('_telescope__polar_motion_y', polar_motion_y):
try:
setattr(self, name, angle(*arg))
except TypeError:
setattr(self, name, angle(arg))
def getLocation(self):
"""
Returns the telescope location information, longitude, latitude
(both as rotator.angles) and altitude (metres).
"""
return angle(self.__longitude), angle(self.__latitude), self.__altitude
def setLocation(self, longitude, latitude, altitude):
"""
Sets the telescope location information.
Arguments:
longitude: Mean longitude of the telescope, east positive, as either
an argument or sequence containing an argument list for the
rotator.angle() constructor.
latitude: Mean geodetic latitude of the telescope, as either an
argument or sequence containing an argument list for the
rotator.angle() constructor.
altitude: The telescope's height above mean sea level, in metres.
"""
for name, arg in \
('_telescope__longitude', longitude), \
('_telescope__latitude', latitude):
try:
setattr(self, name, angle(*arg))
except TypeError:
setattr(self, name, angle(arg))
self.__altitude = float(altitude)
def resetField(self):
"""
Deletes any stored field reference. The effect of this is to force
a recalculation of all the observed-to-apparent-place parameters
in the next call to getPointing(), rather than just updating the
sidereal time.
"""
try:
del(self.__field)
except AttributeError:
pass
def getPointing(self, field, utc):
"""
Calculates the observed azimuth and zenith distance for a given field
and time. Note that these calculations assume a 'perfect' telescope,
no pointing model is used.
Arguments:
field: the target field, expressed as a rotator.field object.
utc: the time of observation, UTC, expressed as a datetime.datetime
object
Returned:
az: the observed azimuth expressed as a rotator.angle, East = 90deg
zd: the observed zenith distance expressed as a rotator.angle.
pa: the parallactic angle at the target field.
"""
if not isinstance(field, target):
raise TypeError("telescope.getPointing() expected rotator.target" \
+ ", got %s." % type(field))
elif not isinstance(utc, datetime.datetime):
raise TypeError("telescope.getPointing() expected " + \
"datetime.datetime, got %s." % type(utc))
else:
if not field.getApparentPlace():
field.updateApparentPlace(utc)
RAApp, DecApp = field.getApparentPlace()
imjd, status = slalib.slaCldj(utc.year, utc.month, utc.day)
if status != 0:
raise SlalibError('slaCldj', status)
status = 0
fmjd, status = slalib.slaDtf2d(utc.hour, utc.minute, \
utc.second + \
utc.microsecond/1000000.0)
if not hasattr(self, 'self.__field') or self.__field != field:
slalib.slaAoppa(\
imjd + fmjd, \
self.__delta_UT, \
self.__longitude.getAngle(), \
self.__latitude.getAngle(), \
self.__altitude, \
self.__polar_motion_x.getAngle(), \
self.__polar_motion_y.getAngle(), \
self.__temperature, \
self.__pressure, \
self.__humidity, \
self.__wavelength, \
self.__tlr, \
self.__aoprms)
self.__field = field
else:
slalib.salAoppat(imjd + fmjd, self.__aoprms)
az, zd, HAObs, decObs, RAObs = slalib.slaAopqk(\
RAApp.getAngle(), DecApp.getAngle(), \
self.__aoprms)
pa = slalib.slaPa(\
self.__aoprms[13] - RAApp.getAngle(), \
DecApp.getAngle(), \
self.__latitude.getAngle())
return angle(az), angle(zd), angle(pa), angle(HAObs), \
angle(decObs), angle(RAObs)
def getWavelength(self):
"""
Returns the stored effective wavelength of observations, in microns.
"""
return self.__wavelength
def setWavelength(self, wavelength):
"""
Sets the effective wavelength of observations.
Arguments:
wavelength: Effective wavelength of observations, in microns.
"""
self.__wavelength = float(wavelength)
def getWeather(self):
"""
Returns the stored meteorological parameters, temperature (in Kelvin),
pressure (in millibars), relative humidity (in the range 0-1.0) and
tropospheric lapse rate (Kelvin/metre).
"""
return self.__temperature, self.__pressure, self.__humidity, \
self.__tlr
def setWeather(self, temperature, pressure, humidity, tlr):
"""
Sets the meteorogical parameters used in the refraction calculations.
Arguments:
temperature: Local ambient temperature (Kelvin)
pressure: Local atmospheric pressure (millibars)
humidity: Local relative humidity (in the range 0.0-1.0)
tlr: Tropospheric lapse rate (Kelvin/metre)
"""
self.__temperature = float(temperature)
self.__pressure = float(pressure)
self.__humidity = float(humidity)
self.__tlr = float(tlr)
class target:
"""
Class representing a target field. Stores the position of the field
in mean place FK5 J2000 coordinates, and calculates the geocentric
apparent coordinates when given the epoch. Also have a number of methods
for setting and retrieving coordinates.
"""
def __init__(self, ra, dec, pa, name):
"""
Initialises the target object. Takes four arguments, the target's
RA, Dec, desired position angle, and name (an identification string
primarily for logging purposes. RA and Dec should be mean place
FK5 J2000 coordinates. Position angle is here defined as the angle
Northpole-field centre-'up', where 'up' is the upwards direction on the
detector. Each of the arguments should be either a single value
acceptable to the rotator.angle constructor (e.g. a rotator.angle
object, a numeric type representing an angle, or a string containing a
sexagesimal/decimal representation of the angle) or a sequence object
containing an argument list for the constructor. If a single value
is given for RA it is assumed to be in units of hours, whereas Dec and
PA are assumed to be in degrees.
"""
self.name = name
for label, arg, unit in ('_target__RA', ra, 'h'), \
('_target__Dec', dec, 'd'), ('_target__PA', pa, 'd'):
try:
setattr(self, label, angle(*arg))
except TypeError:
setattr(self, label, angle(arg, unit))
self.logger = logging.getLogger(name)
def updateConfig(self, config):
"""
Should be called by the parent instrument object's destructor
to enable status information to be saved to file on exit.
Takes a ConfigParserRT.SafeConfigParser object as argument.
"""
self.logger.debug("updateConfig() called.")
if isinstance(config, ConfigParserRT.SafeConfigParser):
config.set(self.name, 'ra', self.__RA.getString('h'))
config.set(self.name, 'dec', self.__Dec.getString('d'))
config.set(self.name, 'pa', str(self.__PA.getAngle('d')))
else:
self.logger.critical("Invalid argument to updateConfig(%s)" \
% repr(config))
self.logger.critical("Unable to save status data")
def getApparentPlace(self):
"""
If apparent place RA and Dec have been calculated (with
updateApparentPlace()) then returns them, else returns None.
"""
try:
return angle(self.__RAApp), angle(self.__DecApp)
except AttributeError:
pass
def getCoords(self):
"""
Returns the mean place FK5 J2000 RA and Dec, and the position angle
for the target, as rotator.angle objects.
"""
return angle(self.__RA), angle(self.__Dec), angle(self.__PA)
def getPA(self):
"""
Returns the position angle for the target as a rotator.angle object
"""
return angle(self.__PA)
def setPA(self, pa):
"""
Sets the requested position angle.
"""
try:
self.__PA = angle(*pa)
except TypeError:
self.__PA = angle(pa)
def setCoords(RA, Dec, PA):
"""
Sets the mean place FK5 J2000 RA and Dec, and the requested position
angle for the target. Arguments are as for __init__().
"""
for name, arg in ('_target__RA', RA), ('_target__Dec', Dec), \
('_target__PA', PA):
try:
setattr(self, name, angle(*arg))
except TypeError:
setattr(self, name, angle(arg))
try:
del self.__RAApp
except AttributeError:
pass
try:
del self.__DecApp
except AttributeError:
pass
def updateApparentPlace(self, tdb):
"""
Performs the precession-nutation, light deflection and annual
aberration calculations required to obtain the apparent place
RA and dec of the target, and both returns and stores the result.
A single argument is required, the Barycentric Dynamical Time of
observation as a datetime.datetime object. According to the called
slalib.slaMap routine's notes, 'The distinction between the required
TDB and TT is always negligible. Moreover, for all but the most
critical applications UTC is adequate'.
"""
if isinstance(tdb, datetime.datetime):
imjd, status = slalib.slaCldj(tdb.year, tdb.month, tdb.day)
if status != 0:
raise SlalibError('slaCldj', status)
status = 0
fmjd, status = slalib.slaDtf2d(tdb.hour, tdb.minute, \
tdb.second + \
tdb.microsecond/1000000.0)
if status != 0:
raise SlalibError('slaDtf2d', status)
RAApp, DecApp = slalib.slaMap(self.__RA.getAngle(), \
self.__Dec.getAngle(), \
0, 0, 0, 0, 2000, \
imjd + fmjd)
self.__RAApp = angle(RAApp)
self.__DecApp = angle(DecApp)
return angle(self.__RAApp), angle(self.__DecApp)
else:
raise TypeError(\
'target.updateApparentPlace() requires a datetime.datetime' + \
' object argument, got %s' % type(tdb))
def unwind(self, shiftnumber):
"""
Changes the position angle by the argument, shiftnumber, times
90 degrees. This will be done by a rotator object in order to
keep its required position within its range of motion. The
argument must be a type acceptible to the builtin int() function.
"""
self.__PA += int(shiftnumber) * math.pi / 2.0
self.__PA.normPi()
try:
del(self.__RAApp)
except AttributeError:
pass
try:
del(self.__DecApp)
except AttributeError:
pass
class angle:
"""
Class for storing angles. Internally stores an angle as a float in radians
and provides methods for setting and retrieving the angle in a variety of
units, both via floats or various string formats.
"""
def __init__(self, value = 0.0, units=False):
"""
Standard initialiser, takes up to two arguments. The first argument
(which defaults to zero) is the angle to store, the second argument
(which defaults to 'r') indicates the units used, 'r' for radians,
'd' for degrees and 'h' for hours. The angle may be in any numeric
type acceptable to the builtin function float(). Alternatively, if
value is itself an angle object, this becomes a C++ style 'copy
constructor' and units is ignored. Thirdly, value may be a string
representation of an angle, in which case it will be parsed by
angle.setString(). For string values only units of 'd' or 'h' are
acceptable, and units will default to 'd'.
"""
self.__radians = 0.0
if isinstance(value, str):
self.setString(value, units or 'd')
else:
self.setAngle(value, units or 'r')
def __eq__(self, other):
try:
return self.__radians == other.getAngle()
except AttributeError:
return self.__radians == other
def __ne__(self, other):
try:
return self.__radians != other.getAngle()
except AttributeError:
return self.__radians != other
def __ge__(self, other):
try:
return self.__radians >= other.getAngle()
except AttributeError:
return self.__radians >= other
def __gt__(self, other):
try:
return self.__radians > other.getAngle()
except AttributeError:
return self.__radians > other
def __le__(self, other):
try:
return self.__radians <= other.getAngle()
except AttributeError:
return self.__radians <= other
def __lt__(self, other):
try:
return self.__radians < other.getAngle()
except AttributeError:
return self.__radians < other
def __add__(self, other):
try:
return angle(self.__radians + other.getAngle())
except AttributeError:
return angle(self.__radians + other)
def __radd__(self, other):
return angle(self.__radians + other)
def __iadd__(self, other):
try:
self.__radians += other.getAngle()
except AttributeError:
self.__radians += other
return self
def __sub__(self, other):
try:
return angle(self.__radians - other.getAngle())
except AttributeError:
return angle(self.__radians - other)
def __rsub__(self, other):
return angle(self.__radians - other)
def __isub__(self, other):
try:
self.__radians -= other.getAngle()
except AttributeError:
self.__radians -= other
return self
def __mul__(self, other):
return angle(self.__radians * other)
def __rmul__(self, other):
return angle(self.__radians * other)
def __imul__(self, other):
self.__radians *= other
return self
def __div__(self, other):
return angle(self.__radians / other)
def __idiv__(self, other):
self.__radians /= other
return self
def __truediv__(self, other):
return angle(self.__radians / other)
def __itruediv__(self, other):
self.__radians /= other
return self
def __neg__(self):
return angle(-self.__radians)
def __pos__(self):
return angle(self)
def __abs__(self):
return angle(abs(self.__radians))
def __repr__(self):
return "rotator.angle(%s)" % self.__radians
def __str__(self):
return str(self.__radians)
def getAngle(self, units='r'):
"""
Returns the value of the angle as a float in the specified units,
'r' - radians, 'd' - degrees or 'h' - hours.
"""
if units == 'r':
return self.__radians
elif units == 'd':
return math.degrees(self.__radians)
elif units == 'h':
return math.degrees(self.__radians) / 15.0
else:
raise UnitsError('angle.getAngle()', units)
def setAngle(self, value=0.0, units='r'):
"""
Sets the angle. Takes up to two arguments. The first argument
(which defaults to zero) is the angle to store, the second argument
(which defaults to 'r') indicates the units used, 'r' for radians,
'd' for degrees and 'h' for hours. The angle may be in any form
acceptable to the builtin function float(). Alternatively, if value
is itself an angle object, this makes the angles equal and the units
argument is ignored.
"""
try:
self.__radians = value.getAngle()
except AttributeError:
if units == 'r':
self.__radians = float(value)
elif units == 'd':
self.__radians = math.radians(float(value))
elif units == 'h':
self.__radians = math.radians(15 * float(value))
else:
raise UnitsError('angle.setAngle()', units)
def getString(self, units='d', ndp=3):
"""
Returns a string representation of the angle in sexagesimal or
'time' formats. The optional first parameter units can be 'd', 'h' or
't' for degrees, hours (sexagesimal) or hours (time format), default
is 'd'. The optional second parameter, 'ndp', specified the number of
decimal places in the seconds field, default 3.
For decimal string representations getAngle() can be used together with
standard string formatting commands.
"""
ints = slalib.intArray(4)
if units == 'd':
sign = slalib.slaDr2af(ndp, self.__radians, ints)
return "%c%d %02d %02d.%0*d" % \
(sign[0], ints[0], ints[1], ints[2], ndp, ints[3])
elif units == 'h':
sign = slalib.slaDr2tf(ndp, self.__radians, ints)
return "%c%d %02d %02d.%0*d" % \
(sign[0], ints[0], ints[1], ints[2], ndp, ints[3])
elif units == 't':
sign = slalib.slaDr2tf(ndp, self.__radians, ints)
return "%c%d:%02d:%02d.%0*d" % \
(sign[0], ints[0], ints[1], ints[2], ndp, ints[3])
else:
raise UnitsError('angle.getString()', units)
def setString(self, angleString, units='d', startPos=1):
"""
Sets the angle from a string representation. The first argument
contains a free format string representing the angle as either
a decimal or sexagesimal number (or a combination of the two) and
the second is a character specifying the units used ('d' for degrees,
'h' for hours). Uses slaDafin from the slalib module to parse the
string. An optional third argument specifies where in the string
to start parsing (in a 1 based, not zero based, count).
Returns the position in the string at which encoding finished.
"""
(endPos, value, status) = slalib.slaDafin(angleString, startPos)
if status == 0:
if units == 'd':
self.__radians = value
elif units == 'h':
self.__radians = value * 15
else:
raise UnitsError('angle.setString()', units)
return endPos
else:
raise ParseError('slalib.slaDafin()', status, startPos, endPos)
def normPi(self):
"""
Normalises the angle into the range +/-Pi.
"""
self.__radians = slalib.slaDrange(self.__radians)
def norm2Pi(self):
"""
Normalises the angle into the range 0-2Pi.
"""
self.__radians = slalib.slaDranrm(self.__radians)