Source code for astrodb_utils.spectra

import datetime
import importlib.util
import logging
import os
import sqlite3
import sys
from typing import Optional

import astropy.units as u
import numpy as np
import requests
import sqlalchemy.exc
from astrodbkit.astrodb import Database
from astropy.io import fits
from specutils import Spectrum
from sqlalchemy import and_

from astrodb_utils import AstroDBError, exit_function
from astrodb_utils.instruments import get_db_instrument
from astrodb_utils.publications import get_db_publication
from astrodb_utils.sources import find_source_in_db
from astrodb_utils.utils import check_obs_date, get_db_regime, internet_connection

matplotlib_check = importlib.util.find_spec("matplotlib")
if matplotlib_check is not None:
    import matplotlib.pyplot as plt


__all__ = ["check_spectrum_plottable", "ingest_spectrum", "find_spectra"]

logger = logging.getLogger(__name__)


[docs] def check_spectrum_plottable( spectrum_path: str | Spectrum, raise_error: bool = True, show_plot: bool = False, format: str = None ): """ Check if spectrum is readable and plottable with specutils. show_plot = True requires matplotlib to be installed. Parameters ---------- spectrum_path : str or Spectrum Path to spectrum file or Spectrum object raise_error : bool. Default=True True: Raise error if spectrum is not plottable False: Do not raise error if spectrum is not plottable. Log warning instead. show_plot : bool. Default=False True: Show plot of spectrum. Matplotlib must be installed. format : str, optional Format of the spectrum file. If not provided, the format will be inferred by specutils. Returns ------- bool True: Spectrum is plottable False: Spectrum is not plottable """ # check if spectrum is a Spectrum object or a file path # if it's a file path, check if it can be read as a Spectrum object if isinstance(spectrum_path, Spectrum): spectrum = spectrum_path elif isinstance(spectrum_path, str): try: spectrum = Spectrum.read(spectrum_path, format=format) except Exception as error_message: msg = f"Unable to load file as Spectrum object:{spectrum_path}:\n{error_message}" exit_function(msg, raise_error=raise_error) else: msg = f"Input is not a valid path or Spectrum object: {spectrum_path}" exit_function(msg, raise_error=raise_error) # checking spectrum has good units wave_unit_check = _check_spectrum_wave_units(spectrum, raise_error=raise_error) if not wave_unit_check: return False flux_unit_check = _check_spectrum_flux_units(spectrum, raise_error=raise_error) if not flux_unit_check: return False # check for NaNs nan_check = _check_spectrum_not_nans(spectrum, raise_error=raise_error) if not nan_check: return False if show_plot: _plot_spectrum(spectrum) return True
def _check_spectrum_not_nans(spectrum, raise_error=True): nan_check: np.ndarray = ~np.isnan(spectrum.flux) & ~np.isnan(spectrum.spectral_axis) wave = spectrum.spectral_axis[nan_check] if not len(wave): msg = "Spectrum is all NaNs" if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return False else: return True def _check_spectrum_wave_units(spectrum, raise_error=True): try: spectrum.spectral_axis.to(u.micron).value return True except AttributeError as e: logger.debug(f"{e}") msg = f"Unable to parse spectral axis: {spectrum}" if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return False except u.UnitConversionError as e: logger.debug(f"{e}") msg = f"Unable to convert spectral axis to microns: {spectrum}" if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return False except ValueError as e: logger.debug(f"{e}") msg = f"Value error: {spectrum}:" if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return False def _check_spectrum_flux_units(spectrum, raise_error=True): expected_units = [ u.get_physical_type(u.erg / u.s / u.cm**2 / u.AA), u.get_physical_type(u.Jy), ] unit_type = u.get_physical_type(spectrum.flux.unit) if unit_type in expected_units: return True else: msg = f"flux units are not expected: {spectrum.flux.unit}. Expecting {expected_units}." if raise_error: logger.error(msg) raise AstroDBError(msg) else: logger.warning(msg) return False def _plot_spectrum(spectrum): if "matplotlib" in sys.modules: plt.plot(spectrum.spectral_axis, spectrum.flux) plt.xlabel(f"Dispersion ({spectrum.spectral_axis.unit})") plt.ylabel(f"Flux ({spectrum.flux.unit})") plt.show() else: msg = "To display the spectrum, matplotlib most be installed." logger.warning(msg)
[docs] def ingest_spectrum( db: Database, *, source: str = None, spectrum: str = None, regime: str = None, telescope: str = None, instrument: str = None, mode: str = None, obs_date: str | datetime.datetime = None, reference: str = None, original_spectrum: Optional[str] = None, comments: Optional[str] = None, other_references: Optional[str] = None, local_spectrum: Optional[str] = None, raise_error: bool = True, format: Optional[str] = None, ): """ Parameters ---------- db: astrodbkit.astrodb.Database Database object created by astrodbkit source: str source name spectrum: str URL or path to spectrum file regime: str Regime of spectrum (optical, infrared, radio, etc.) controlled by Regimes table telescope: str Telescope used to obtain spectrum. Required to be in Telescopes table. instrument: str Instrument used to obtain spectrum. Instrument-Mode pair needs to be in Instruments table. mode: str Instrument mode used to obtain spectrum. Instrument-Mode pair needs to be in Instruments table. obs_date: str Observation date of spectrum. reference: str Reference for spectrum. Required to be in Publications table. original_spectrum: str Path to original spectrum file if different from spectrum. comments: str Comments about spectrum. other_references: str Other references for spectrum. local_spectrum: str Path to local spectrum file. raise_error: bool If True, raise an error if the spectrum cannot be added. If False, continue without raising an error. format: str Format of the spectrum file used by specutils to load the file. If not provided, the format will be determined by specutils. Options: "tabular-fits", Returns ------- flags: dict Status response with the following keys: - "added": True if it's added and False if it's skipped. - "content": the data that was attempted to add - "message": string which includes information about why skipped Raises ------ AstroDBError """ # Compile fields into a dictionary flags = {"added": False, "content": {}, "message": ""} # Make sure reference is provided and is in the Publications table if reference is None: msg = "Reference is required." flags["message"] = msg exit_function(msg, raise_error=raise_error, return_value=flags) return flags else: reference_db = get_db_publication(db, reference, raise_error=raise_error) if reference_db is None: flags["message"] = f"Reference not found in database: {reference}." return flags # If a date is provided as a string, convert it to datetime logger.debug(f"Parsing obs_date: {obs_date}") if obs_date is not None and isinstance(obs_date, str): parsed_date = check_obs_date(obs_date, raise_error=raise_error) elif isinstance(obs_date, datetime.datetime): parsed_date = obs_date if obs_date is None or (parsed_date is None and raise_error is False): msg = f"Observation date is not valid: {obs_date}" flags["message"] = msg if raise_error: raise AstroDBError(msg) else: logger.warning(msg) return flags # Get source name as it appears in the database db_name = find_source_in_db(db, source) logger.debug(f"Found db_name: {db_name} for source: {source}") if len(db_name) == 1: db_name = db_name[0] else: msg = f"Invalid source name. No unique source match for {source} in the database. Found {db_name}." flags["message"] = msg if raise_error: raise AstroDBError(msg) else: logger.warning(msg) return flags # Check if regime is provided and is in the Regimes table regime = get_db_regime(db, regime, raise_error=raise_error) if regime is None: msg = f"Regime not found in database: {regime}." flags["message"] = msg if raise_error: raise AstroDBError(msg) else: logger.warning(msg) return flags # Find the right instrument-mode-telescope in the database if instrument is None: msg = "Instrument is required." flags["message"] = msg if raise_error: raise AstroDBError(msg) else: logger.warning(msg) return flags instrument, mode, telescope = get_db_instrument( db, instrument=instrument, mode=mode, telescope=telescope, ) # Check if spectrum is a duplicate matches = find_spectra( db, db_name, reference=reference, obs_date=parsed_date, telescope=telescope, instrument=instrument, mode=mode, regime=regime, ) if len(matches) > 0: msg = f"Skipping suspected duplicate measurement: {source}" msg2 = f"{matches} {instrument, mode, parsed_date, reference, spectrum}" logger.debug(msg2) flags["message"] = msg # exit_function(msg, raise_error=raise_error) if raise_error: raise AstroDBError(msg) else: logger.warning(msg) return flags # Check if spectrum file(s) are accessible check_spectrum_accessible(spectrum) if original_spectrum is not None: check_spectrum_accessible(original_spectrum) # Check if spectrum is plottable flags["plottable"] = check_spectrum_plottable(spectrum) # If it's a FITS file, verify the header if os.path.splitext(spectrum)[1] == ".fits": with fits.open(spectrum) as hdul: hdul.verify("warn") row_data = { "source": db_name, "access_url": spectrum, "original_spectrum": original_spectrum, "local_spectrum": local_spectrum, "regime": regime, "telescope": telescope, "instrument": instrument, "mode": mode, "observation_date": parsed_date, "reference": reference_db, "comments": comments, "other_references": other_references, } logger.debug(f"Trying to ingest: {row_data}") flags["content"] = row_data try: # Attempt to add spectrum to database with db.engine.connect() as conn: conn.execute(db.Spectra.insert().values(row_data)) conn.commit() flags["added"] = True logger.info( f"Added spectrum for {source}: {telescope}-{instrument}-{mode} from {reference} " f"on {parsed_date.strftime('%d %b %Y')}" ) except (sqlite3.IntegrityError, sqlalchemy.exc.IntegrityError) as e: msg = f"Integrity Error: {source} \n {e}" flags["message"] = msg if raise_error: raise AstroDBError(msg) else: logger.error(msg) return flags except Exception as e: msg = f"Spectrum for {source} could not be added to the database for unexpected reason: {e}" flags["message"] = msg if raise_error: raise AstroDBError(msg) else: logger.warning(msg) return flags return flags
[docs] def find_spectra( db: Database, source: str, *, reference: str = None, obs_date: str = None, telescope: str = None, instrument: str = None, mode: str = None, regime: str = None, ): """ Find what spectra already exists in database for this source Finds matches based on parameter provided. E.g., if only source is provided, all spectra for that source are returned. If Source and telescope are provided, only spectra for that source and telescope are provided. Parameters ---------- db: astrodbkit.astrodb.Database Database object created by astrodbkit source: str source name Returns ------- source_spec_data: astropy.table.Table Table of spectra for source """ source_spec_data = db.query(db.Spectra) # entire Spectra table filter_list = [db.Spectra.c.source == source] if reference is not None: filter_list.append(db.Spectra.c.reference == reference) if telescope is not None: filter_list.append(db.Spectra.c.telescope == telescope) if obs_date is not None: filter_list.append(db.Spectra.c.observation_date == obs_date) if instrument is not None: filter_list.append(db.Spectra.c.instrument == instrument) if mode is not None: filter_list.append(db.Spectra.c.mode == mode) if regime is not None: filter_list.append(db.Spectra.c.regime == regime) # Actually perform the query if len(filter_list) > 0: source_spec_data = source_spec_data.filter(and_(*filter_list)) else: source_spec_data = source_spec_data.filter(filter_list[0]) source_spec_data = source_spec_data.table() if len(source_spec_data) > 0: logger.debug(f"Found {len(source_spec_data)} spectra for source: {source}") logger.debug(f"Spectra data: {source_spec_data}") return source_spec_data
def check_spectrum_accessible(spectrum: str) -> bool: """ Check if the spectrum is accessible Parameters ---------- spectrum: str URL or path to spectrum file Returns ------- bool True if the spectrum is accessible, False otherwise """ logger.debug(f"Checking spectrum: {spectrum}") internet = internet_connection() if internet: request_response = requests.head(spectrum) status_code = request_response.status_code # The website is up if the status code is 200 if status_code != 200: msg = ( "The spectrum URL does not appear to be accessible: \n" f"spectrum: {spectrum} \n" f"status code: {status_code}" ) logger.error(msg) return False else: msg = "The URL for the spectrum is accessible (status code = 200)." logger.debug(msg) return True else: msg = "No internet connection. Internet is needed to check spectrum URLs." raise AstroDBError(msg)