Source code for astrodb_utils.photometry

"""Photometry functions"""

import logging
from io import BytesIO
from typing import Optional

import astropy.units as u
import numpy as np
import requests
import sqlalchemy.exc
from astropy.io.votable import parse

from astrodb_utils import (
    AstroDBError,
    internet_connection,
)
from astrodb_utils.instruments import ingest_instrument
from astrodb_utils.publications import find_publication
from astrodb_utils.sources import find_source_in_db

logger = logging.getLogger(__name__)

__all__ = ["ingest_photometry", "ingest_photometry_filter", "fetch_svo", "assign_ucd"]


[docs] def ingest_photometry( db, *, source: str = None, band: str = None, regime: str = None, magnitude: float = None, magnitude_error: float = None, reference: str = None, telescope: Optional[str] = None, epoch: Optional[float] = None, comments: Optional[str] = None, raise_error: bool = True, ): """ Add a photometry measurement to the database Parameters ---------- db: astrodbkit.astrodb.Database source: str band: str regime: str magnitude: float magnitude_error: float reference: str telescope: str, optional epoch: float, optional comments: str, optional raise_error: bool, optional True (default): Raise an error if a source cannot be ingested False: Logs a warning but does not raise an error Returns ------- flags: dict added: bool True if the measurement was added to the database False if the measurement was not added to the database """ flags = {"added": False} # Make sure required fields are provided if source is None or band is None or magnitude is None or reference is None: msg = ( "source, band, magnitude, and reference are required. \n" f"Provided: source={source}, band={band}, " f"magnitude={magnitude}, " f"reference={reference}" ) if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return flags # Make sure source exists in the database db_name = find_source_in_db(db, source) if len(db_name) != 1: msg = f"No unique source match for {source} in the database" if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return flags else: db_name = db_name[0] # Make sure the reference exists in the Publications table pub_check = find_publication(db, reference=reference) if pub_check[0]: msg = f"Reference found: {pub_check[1]}." logger.info(msg) if not pub_check[0]: msg = f"Reference {reference} not found in Publications table." if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return flags # TODO: Make sure band exists in the PhotometryFilters table band_match = ( db.query(db.PhotometryFilters) .filter(db.PhotometryFilters.c.band == band) .table() ) if len(band_match) == 0: msg = f"Band {band} not found in PhotometryFilters table." if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return flags # If telescope is provided, make sure it exists in the Telescopes table if telescope is not None: telescope_match = ( db.query(db.Telescopes) .filter(db.Telescopes.c.telescope == telescope) .table() ) if len(telescope_match) == 0: msg = f"Telescope {telescope} not found in Telescopes table." if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return flags # if the uncertainty is masked, set it to None, # otherwise convert to a string if isinstance(magnitude_error, np.ma.core.MaskedConstant): mag_error = None elif magnitude_error is None: mag_error = None else: mag_error = str(magnitude_error) # Construct data to be added photometry_data = [ { "source": db_name, "band": band, "magnitude": str( magnitude ), # Convert to string to maintain significant digits "magnitude_error": mag_error, "telescope": telescope, "epoch": epoch, "comments": comments, "reference": reference, } ] # In case regime column is not present if regime is not None: photometry_data[0]["regime"] = regime logger.debug(f"Photometry data: {photometry_data}") try: with db.engine.connect() as conn: conn.execute(db.Photometry.insert().values(photometry_data)) conn.commit() flags["added"] = True logger.info(f"Photometry measurement added: \n{photometry_data}") except sqlalchemy.exc.IntegrityError as e: if "UNIQUE constraint failed:" in str(e): msg = "The measurement may be a duplicate." if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) else: msg = ( "The source may not exist in Sources table.\n" "The band may not exist in the PhotometryFilters table.\n" "The reference may not exist in the Publications table. " "Add it with add_publication function." ) if raise_error: logger.error(msg + str(e)) raise AstroDBError(msg + str(e)) else: logger.warning(f"{msg}/n{e}") return flags
[docs] def ingest_photometry_filter( db, *, telescope=None, instrument=None, filter_name=None, ucd=None, wavelength_col_name: str = "effective_wavelength_angstroms", width_col_name: str = "width_angstroms", ): """ Add a new photometry filter to the database """ # Fetch existing telescopes, add if missing existing = ( db.query(db.Telescopes).filter(db.Telescopes.c.telescope == telescope).table() ) if len(existing) == 0: with db.engine.connect() as conn: conn.execute(db.Telescopes.insert().values({"telescope": telescope})) conn.commit() logger.info(f"Added telescope {telescope}.") else: logger.info(f"Telescope {telescope} already exists.") # Fetch existing instruments, add if missing existing = ( db.query(db.Instruments) .filter(db.Instruments.c.instrument == instrument) .table() ) if len(existing) == 0: ingest_instrument( db, telescope=telescope, instrument=instrument, mode="Imaging" ) logger.info(f"Added instrument {instrument}.") else: logger.info(f"Instrument {instrument} already exists.") # Get data from SVO try: filter_id, wave_eff, fwhm, width_effective = fetch_svo( telescope, instrument, filter_name ) logger.info( f"From SVO: Filter {filter_id} has effective wavelength {wave_eff} " f"and FWHM {fwhm} and width_effective {width_effective}." ) except AstroDBError as e: msg = f"Error fetching filter data from SVO: {e}" logger.error(msg) raise AstroDBError(msg) if ucd is None: ucd = assign_ucd(wave_eff) logger.info(f"UCD for filter {filter_id} is {ucd}") # Add the filter try: with db.engine.connect() as conn: conn.execute( db.PhotometryFilters.insert().values( { "band": filter_id, "ucd": ucd, wavelength_col_name: wave_eff.to(u.Angstrom).value, width_col_name: width_effective.to(u.Angstrom).value, } ) ) conn.commit() logger.info( f"Added filter {filter_id} with effective wavelength {wave_eff}, width {width_effective}, and UCD {ucd}." ) except sqlalchemy.exc.IntegrityError as e: if "UNIQUE constraint failed:" in str(e): msg = str(e) + f"Filter {filter_id} already exists in the database." raise AstroDBError(msg) else: msg = str(e) + f"Error adding filter {filter_id}." raise AstroDBError(msg) except Exception as e: msg = str(e) raise AstroDBError(msg)
[docs] def fetch_svo(telescope: str = None, instrument: str = None, filter_name: str = None): """ Fetch photometry filter information from the SVO Filter Profile Service http://svo2.cab.inta-csic.es/theory/fps/ Could use better error handling when instrument name or filter name is not found Parameters ---------- telescope: str Telescope name instrument: str Instrument name filter_name: str Filter name Returns ------- filter_id: str Filter ID wave_eff: Quantity Effective wavelength fwhm: Quantity Full width at half maximum (FWHM) width_effective: Quantity Effective width of the filter Raises ------ AstroDBError If the SVO URL is not reachable or the filter information is not found KeyError If the filter information is not found in the VOTable """ if internet_connection() is False: msg = "No internet connection. Cannot fetch photometry filter information from the SVO website." logger.error(msg) raise AstroDBError(msg) url = f"http://svo2.cab.inta-csic.es/svo/theory/fps3/fps.php?ID={telescope}/{instrument}.{filter_name}" try: r = requests.get(url) except requests.exceptions.ConnectTimeout as e: msg = f"Connection timed out while trying to reach {url}. {e}" logger.error(msg) raise AstroDBError(msg) if r.status_code != 200: msg = f"Error retrieving {url}. Status code: {r.status_code}" logger.error(msg) raise AstroDBError(msg) # Parse VOTable contents content = BytesIO(r.content) votable = parse(content) # Get Filter ID try: filter_id = votable.get_field_by_id("filterID").value except KeyError: msg = f"Filter {telescope}, {instrument}, {filter_name} not found in SVO." raise AstroDBError(msg) # Get effective wavelength and FWHM wave_eff = votable.get_field_by_id("WavelengthEff") fwhm = votable.get_field_by_id("FWHM") width_effective = votable.get_field_by_id("WidthEff") if wave_eff.unit == "AA" and fwhm.unit == "AA" and width_effective.unit == "AA": wave_eff = wave_eff.value * u.Angstrom fwhm = fwhm.value * u.Angstrom width_effective = width_effective.value * u.Angstrom else: msg = ( f"Wavelengths from SVO may not be Angstroms as expected: {wave_eff.unit}," f"{fwhm.unit}, {width_effective.unit}." ) raise AstroDBError(msg) logger.debug( f"Found in SVO: " f"Filter {filter_id} has effective wavelength {wave_eff} and " f"FWHM {fwhm} and effective width {width_effective}." ) return filter_id, wave_eff, fwhm, width_effective
[docs] def assign_ucd(wave_eff_quantity: u.Quantity): """ Assign a Unified Content Descriptors (UCD) to a photometry filter based on its effective wavelength UCDs are from the UCD1+ controlled vocabulary https://www.ivoa.net/documents/UCD1+/20200212/PEN-UCDlist-1.4-20200212.html#tth_sEcB Parameters ---------- wave_eff: Quantity Effective wavelength Returns ------- ucd: str UCD string """ wave_eff_quantity.to(u.Angstrom) wave_eff = wave_eff_quantity.value ucd_dict = { (3000, 4000): "em.opt.U", (4000, 5000): "em.opt.B", (5000, 6000): "em.opt.V", (6000, 7500): "em.opt.R", (7500, 10000): "em.opt.I", (10000, 15000): "em.IR.J", (15000, 20000): "em.IR.H", (20000, 30000): "em.IR.K", (30000, 40000): "em.IR.3-4um", (40000, 80000): "em.IR.4-8um", (80000, 150000): "em.IR.8-15um", (150000, 300000): "em.IR.15-30um", } for key, value in ucd_dict.items(): if key[0] < wave_eff <= key[1]: ucd = value break else: ucd = None return ucd