import logging
import astropy.units as u
import dateparser
import numpy as np
from astropy.coordinates import SkyCoord
from astropy.io import fits
from astroquery.simbad import Simbad
from astrodb_utils.photometry import assign_ucd
__all__ = [
"add_missing_keywords",
"add_wavelength_keywords",
"add_observation_date",
"check_header",
]
logger = logging.getLogger(__name__)
[docs]
def add_missing_keywords(header=None, format="ivoa-spectrum-dm-1.2", keywords=None):
"""Finds the keywords that are missing from a header and adds them with blank values
Inputs
-------
header: fits.Header
a fits header object or dictionary of header values
format: string
header data models to enforce. Options, 'simple-spectrum' and 'ivoa-spectrum-dm-1.2`
keywords: list
a list of keywords to check for if format is not specified
Returns
-------
FITS header object
Examples
--------
Returns a header with keywords but blank values
>>> new_header = add_missing_keywords()
Adds missing keywords (with blank values) to an existing header
using the simple spectrum data model
>>> new_header = add_missing_keywords(old_header, format='simple-spectrum')
"""
# If no header was provided, start with a blank one
if header is None:
header = fits.Header()
if keywords is None and format is None:
format = "ivoa-spectrum-dm-1.2"
keywords = get_keywords(format)
missing_keywords = []
# Loop through the original header and add keywords with blank values to the new header
for keyword, comment in keywords:
value = header.get(keyword)
if value is None:
header.set(keyword, None, comment)
missing_keywords.append((keyword, comment))
if format == "ivoa-spectrum-dm-1.2":
header["VOCLASS"] = "Spectrum-1.2"
# Loop over missing keywords and print for copy and paste purposes
print(
"COPY AND PASTE THE FOLLOWING COMMANDS INTO YOUR SCRIPT \n"
"Replace <value> with the appropriate value for your dataset \n"
"If you're not sure of the correct value, use None \n"
"If you started with a header object not called `header`, \n"
"replace 'header' with the name of your header object \n"
"Use the `astrodb_utils.fits.add_wavelength_keywords` function \n"
"to add the SPEC_VAL, SPEC_BW, TDMID1, TDMAX1, and SPECBAND keywords \n"
)
for keyword, comment in missing_keywords:
print(f"header.set('{keyword}', \"<value>\")") # {comment}")
return header
[docs]
def add_wavelength_keywords(header=None, wavelength_data=None):
"""Uses wavelength array to generate header keywords
Inputs
-------
wavelength_data: astropy.units.Quantity
an array of wavelengths. should include units.
header_dict: header
a Header object
Returns
-------
None
Examples
--------
>>> wavelength = np.arange(5100, 5300)*u.AA
>>> add_wavelength_keywords(header=new_header, wavelength_data = wavelength)
"""
# Make new, blank header
if header is None:
header = fits.Header()
# Use wavelength data to calculate header values
w_min = min(wavelength_data).astype(np.single)
w_max = max(wavelength_data).astype(np.single)
width = (w_max - w_min).astype(np.single)
w_mid = ((w_max + w_min) / 2).astype(np.single)
bandpass = assign_ucd(w_mid)
header.set("SPECBAND", bandpass)
header.set("SPEC_VAL", w_mid.value, f"[{w_mid.unit}] Characteristic spec coord")
header.set("SPEC_BW", width.value, f"[{width.unit}] Width of spectrum")
header.set("TDMIN1", w_min.value, f"[{w_min.unit}] Starting wavelength")
header.set("TDMAX1", w_max.value, f"[{w_max.unit}] Ending wavelength")
header["HISTORY"] = (
"Wavelength keywords added by astrodb_utils.fits.add_wavelength_keywords"
)
# return header
[docs]
def add_observation_date(header=None, date=None):
"""Adds the observation date to the header
Inputs
-------
header: fits.Header
a fits header object or dictionary of header values
date: string
the date of the observation
Returns
-------
None
Examples
--------
>>> add_observation_date(header, date='2021-06-01')
"""
if header is None:
header = fits.Header()
if date is None:
raise ValueError("Date of observation is required")
try:
obs_date = dateparser.parse(date)
logger.debug(f"Date of observation: {date} parsed to {obs_date}")
if obs_date is not None:
obs_date_short = obs_date.strftime("%Y-%m-%d")
obs_date_long = obs_date.strftime("%b %d, %Y")
header.set("DATE-OBS", obs_date_short, "date of the observation")
print(f"Date of observation: {obs_date_long}")
print(f"DATE-OBS set to : {obs_date_short}.")
else:
raise ValueError(f"Date could not be parsed by dateparser.parse: {date}")
except Exception as e:
raise e
def get_keywords(format):
formats = ["simple-spectrum", "ivoa-spectrum-dm-1.2"]
if format not in formats:
msg = f"(Format must be one of these: {formats})"
raise ValueError(msg)
if format == "simple-spectrum":
keywords = [
("OBJECT", "Name of observed object"),
("RA_TARG", "[deg] target position"),
("DEC_TARG", "[deg] target position"),
("INSTRUME", "Instrument name"),
("TELESCOP", "Telescope name"),
("DATE-OBS", "Date of observation"),
("TELAPSE", "[s] Total elapsed time (s)"),
("AUTHOR", "Authors of original dataset"),
("VOREF", "URL, DOI, or bibcode of original publication"),
("VOPUB", "Publisher"), # SIMPLE
("CONTRIB1", "Contributor who generated this header"),
("SPEC_VAL", "[angstrom] Characteristic spectral coordinate"),
("SPEC_BW", "[angstrom] width of spectrum"),
("TDMIN1", "Start in spectral coordinate"),
("TDMAX1", "Stop in spectral coordinate"),
("SPECBAND", "SED.bandpass"),
("APERTURE", "[arcsec] slit width"),
]
elif format == "ivoa-spectrum-dm-1.2":
keywords = [
("OBJECT", "Name of observed object"),
("RA_TARG", "[deg] target position"),
("DEC_TARG", "[deg] target position"),
("INSTRUME", ""),
("TELESCOP", ""),
("OBSERVAT", ""),
("VOCLASS", "Data model name and version"),
("VOPUB", "Publisher"),
("VOREF", "URL, DOI, or bibcode of original publication"),
("TITLE", "Dataset title "),
("AUTHOR", "Authors of the original dataset"),
("CONTRIB1", "Contributor who generated this file"), # optional
("DATE-OBS", "Date of observation"), # optional
("TMID", "[d] MJD of exposure mid-point"),
("TELAPSE", "[s] Total elapsed time (s)"),
("SPEC_VAL", "[angstrom] Characteristic spectral coordinate"),
("SPEC_BW", "[angstrom] width of spectrum"),
("TDMIN1", "Start in spectral coordinate"),
("TDMAX1", "Stop in spectral coordinate"),
("SPECBAND", "SED.bandpass"),
("APERTURE", "[arcsec] slit width"),
]
return keywords
def make_skycoord(header):
"""
Make a SkyCoord object from the RA and Dec in the header
Look for coordinates in either RA_TARG and DEC_TARG or RA and DEC keywords
"""
if header.get("RA_TARG") is None and header.get("RA") is None:
print("RA_TARG or RA is required")
ra = None
return None
if header.get("RA_TARG") is not None:
ra = float(header.get("RA_TARG"))
elif header.get("RA") is not None:
ra = float(header.get("RA"))
if ra > 360:
print("RA_TARG does not appear to be in degrees")
print(f"RA_TARG: {ra}")
print("RA_TARG should be in degrees")
return None
logger.debug(f"RA: {ra}")
if header.get("DEC_TARG") is None and header.get("DEC") is None:
print("DEC_TARG or DEC is required")
dec = None
return None
if header.get("DEC_TARG") is not None:
dec = float(header.get("DEC_TARG"))
elif header.get("DEC") is not None:
dec = float(header.get("DEC"))
if dec > 90 or dec < -90:
print("DEC_TARG value is out of the expected range.")
print(f"DEC_TARG: {dec}")
print("DEC_TARG should be in degrees in the range -90 to 90")
return None
logger.debug(f"DEC: {dec}")
# Check if ra and dec could be read into SkyCoord object and converted to sexagesimal
try:
coord = SkyCoord(ra, dec, unit=(u.deg, u.deg))
coord_str = coord.to_string("hmsdms")
print(f"coordinates converted to sexagesimal: {coord_str}")
return coord
except Exception as e:
print(
f"coordinates ({ra},{dec}) could not be converted to Skycoord object: {e}"
)
return None
def check_simbad_name(header):
# search SIMBAD for object name
object_name = header.get("OBJECT")
if object_name is None:
print("OBJECT name is required")
return False
simbad_name_results = Simbad.query_object(object_name)
logger.debug(f"SIMBAD results for object name {object_name}: {simbad_name_results}")
print(f"SIMBAD results for object name {object_name}: {simbad_name_results}")
coord = make_skycoord(header)
if simbad_name_results is None and coord is None:
print(f"Object name {object_name} not found in SIMBAD")
return False
# search for object using RA/Dec coordinates
elif simbad_name_results is None and coord is not None:
coord_str = coord.to_string("hmsdms")
print(f'Object name {object_name} not found in SIMBAD, trying 60" coord search')
simbad_coord_results = Simbad.query_region(coord, radius="0d0m60s")
if simbad_coord_results is not None:
print(f'SIMBAD objects within 60" search of {coord_str}:')
print(simbad_coord_results)
return False
if len(simbad_name_results) == 1:
print(f"Object name {object_name} found in SIMBAD")
print(simbad_name_results)
return True
else:
print(f"Multile object name {object_name} matches found in SIMBAD")
print(simbad_name_results)
return False
def check_ra_dec_simbad(simbad_name_results):
# TODO: make this a skycoord object comparison and figure out correct tolerance
result = True
# check RA and Dec agree with SIMBAD
# ra_simbad = simbad_name_results['RA']
# dec_simbad = simbad_name_results['DEC']
# ra_check = np.isclose(ra, ra_simbad, atol=0.1) # check if ra is close to simbad ra
# dec_check = np.isclose(dec, dec_simbad, atol=0.1) # check if dec
# if not ra_check or not dec_check:
# print("RA_TARG and DEC_TARG do not match SIMBAD coordinates")
# print(f"RA_TARG: {ra}, DEC_TARG: {dec}")
# print(f"SIMBAD RA: {ra_simbad}, DEC: {dec_simbad}")
# result = False
return result
def check_date(header):
# check date can be turned to dateTime object
date = header.get("DATE-OBS")
if date is None:
print("DATE-OBS is not set in header")
result = False
else:
try:
obs_date = dateparser.parse(date)
obs_date_long = obs_date.strftime("%b %d, %Y")
print(f"DATE-OBS set to : {date}.")
print(f"Date of observation: {obs_date_long}")
result = True
except Exception as e:
print(
f"Date ({date})could not be converted to Python DateTime object \n {e}"
)
result = False
logger.debug(f"Date of observation: {date} returns {result}")
return result