import logging
import astropy.units as u
import sqlalchemy.exc
from astropy.coordinates import SkyCoord
from astropy.table import Table
from astroquery.simbad import Simbad
from astrodb_utils import AstroDBError, exit_function
from astrodb_utils.publications import find_publication
from astrodb_utils.utils import internet_connection
__all__ = [
"find_source_in_db",
"ingest_name",
"ingest_source",
]
logger = logging.getLogger(__name__)
[docs]
def find_source_in_db(
db,
source: str,
*,
ra: float = None,
dec: float = None,
search_radius: u.Quantity = 60.0 * u.arcsec,
ra_col_name: str = "ra_deg",
dec_col_name: str = "dec_deg",
use_simbad: bool = True,
fuzzy: bool = False,
):
"""
Find a source in the database given a source name and optional coordinates.
Uses astrodbkit .search_object and .query_region methods to search the Sources and Names table.
Parameters
----------
db
source: str
Source name
ra: float
Right ascensions of sources. Decimal degrees.
dec: float
Declinations of sources. Decimal degrees.
search_radius: u.Quantity
Search radius. Default is 60 arcseconds.
ra_col_name: str
Name of the column in the database table that contains the right ascension
dec_col_name: str
Name of the column in the database table that contains the declination
use_simbad: bool
Use Simbad to resolve the source name if it is not found in the database. Default is True.
Set to False if no internet connection.
fuzzy: bool
Use fuzzy search to find source name in database. Default is False.
Returns
-------
List of strings.
one match: Single element list with one database source name
multiple matches: List of possible database names
no matches: Empty list
"""
source = source.strip()
logger.debug(f"{source}: Searching for match in database. Use Simbad: {use_simbad}")
db_name_matches = db.search_object(
source,
output_table="Sources",
fuzzy_search=False,
verbose=False,
resolve_simbad=use_simbad,
)
# NO MATCHES
# If no matches, try fuzzy search
if len(db_name_matches) == 0 and fuzzy:
logger.debug(f"{source}: No name matches, trying fuzzy search")
db_name_matches = db.search_object(
source,
output_table="Sources",
fuzzy_search=True,
verbose=False,
resolve_simbad=use_simbad,
)
# If no internet, then do not use Simbad
if use_simbad and not internet_connection():
logger.warning(
f"{source}: No internet connection, not using Simbad to resolve source name."
)
use_simbad = False
# If still no matches, try to resolve the name with Simbad
if len(db_name_matches) == 0 and use_simbad:
logger.debug(
f"{source}: No name matches, trying Simbad search. use_simbad: {use_simbad}"
)
db_name_matches = db.search_object(
source, resolve_simbad=True, fuzzy_search=False, verbose=False
)
# if still no matches, try spatial search using coordinates, if provided
if len(db_name_matches) == 0 and ra and dec:
location = SkyCoord(ra, dec, frame="icrs", unit="deg")
logger.debug(
f"{source}: Trying coord in database search around "
f"{location.ra.degree}, {location.dec}"
)
db_name_matches = db.query_region(
location, radius=search_radius, ra_col=ra_col_name, dec_col=dec_col_name
)
# If still no matches, try to get the coords from SIMBAD using source name
if len(db_name_matches) == 0 and use_simbad:
simbad_skycoord = coords_from_simbad(source)
# Search database around that coordinate
if simbad_skycoord is not None:
msg2 = (
f"Finding sources around {simbad_skycoord} with radius {search_radius} "
f"using ra_col_name: {ra_col_name}, dec_col_name: {dec_col_name}"
)
logger.debug(msg2)
try:
db_name_matches = db.query_region(
simbad_skycoord, radius=search_radius, ra_col=ra_col_name, dec_col=dec_col_name
)
except KeyError as e:
msg = (
f"Error querying database with coordinates from SIMBAD: {e}\n"
f"Using skycoord: {simbad_skycoord} and ra_col_name: {ra_col_name}, dec_col_name: {dec_col_name}.\n"
"Try setting `ra_col_name` and `dec_col_name` arguments to the "
"column names used in the Sources table.\n"
)
logger.error(msg)
raise AstroDBError(msg)
if len(db_name_matches) == 1:
db_names = db_name_matches["source"].tolist()
logger.debug(f"One match found for {source}: {db_names[0]}")
# check coords of the match make sense
coord_check = (
ra
and dec
and u.isclose(ra*u.degree, db_name_matches[ra_col_name][0]*u.degree,atol=search_radius)
and u.isclose(dec*u.degree, db_name_matches[dec_col_name][0]*u.degree, atol=search_radius)
)
if ra and dec and not coord_check:
msg = (
f"Coordinates do not match for {source} and {db_names[0]} within {search_radius}.\n"
f"Coordinates provided: {ra}, {dec}\n"
f"Coordinates in database: {db_name_matches[ra_col_name][0]}, {db_name_matches[dec_col_name][0]}\n"
f"Separation RA: {(ra*u.degree - db_name_matches[ra_col_name][0]*u.degree).to(u.arcsec)}\n"
f"Separation dec: {(dec*u.degree - db_name_matches[dec_col_name][0]*u.degree).to(u.arcsec)}\n"
)
logger.info(msg)
return [] # return empty list if coordinates do not match
elif len(db_name_matches) > 1:
db_names = db_name_matches["source"].tolist()
logger.debug(f"More than one match found for {source}: {db_names}")
# TODO: Find way for user to choose correct match
elif len(db_name_matches) == 0:
db_names = []
logger.debug(f" {source}: No match found")
else:
raise AstroDBError(f"Unexpected condition searching for {source}")
return db_names
def coords_from_simbad(source):
"""
Get coordinates from SIMBAD using a source name
Parameters
----------
source: str
Name of the source
Returns
-------
SkyCoord object
"""
simbad_result_table = Simbad.query_object(source)
if simbad_result_table is None:
logger.debug(f"SIMBAD returned no results for {source}")
simbad_skycoord = None
elif len(simbad_result_table) == 1:
logger.debug(
f"simbad colnames: {simbad_result_table.colnames} \n simbad results \n {simbad_result_table}"
)
if "ra" in simbad_result_table.colnames: # for astroquery >=0.4.8
ra_col_name_simbad = "ra"
dec_col_name_simbad = "dec"
else:
msg = (
f"Unexpected column names in SIMBAD result table. "
f"You may need to upgrade astroquery >=0.4.8. "
f"Expecting 'ra'. Recieved: \n {simbad_result_table.colnames}"
)
raise AstroDBError(msg)
simbad_coords = f"{simbad_result_table[ra_col_name_simbad][0]} {simbad_result_table[dec_col_name_simbad][0]}"
logger.debug(f"SIMBAD coord string: {simbad_coords}")
# TODO: Get units from SIMBAD result table?
simbad_skycoord = SkyCoord(simbad_coords, unit=(u.deg, u.deg))
msg = f"Coordinates retrieved from SIMBAD {simbad_skycoord.ra.deg}, {simbad_skycoord.dec.deg}"
logger.debug(msg)
else:
simbad_skycoord = None
msg = f"More than one match found in SIMBAD for {source}"
logger.warning(msg)
return simbad_skycoord
# NAMES
[docs]
def ingest_name(
db, source: str = None, other_name: str = None, raise_error: bool = None
):
"""
This function ingests an other name into the Names table
Parameters
----------
db: astrodbkit.astrodb.Database
Database object created by astrodbkit
source: str
Name of source as it appears in sources table
other_name: str
Name of the source different than that found in source table
raise_error: bool
Raise an error if name was not ingested
Returns
-------
other_name: str
Name of the source as it appears in the Names table
or None if name was not ingested
"""
source = strip_unicode_dashes(source)
other_name = strip_unicode_dashes(other_name)
name_data = [{"source": source, "other_name": other_name}]
try:
with db.engine.connect() as conn:
conn.execute(db.Names.insert().values(name_data))
conn.commit()
logger.info(f"Name added to database: {name_data}\n")
return other_name
except sqlalchemy.exc.IntegrityError as e:
msg = f"Could not add {name_data} to Names."
if "UNIQUE constraint failed: " in str(e):
msg += "Other name is already present."
exit_function(msg, raise_error)
# SOURCES
[docs]
def ingest_source(
db,
source,
reference: str,
*,
ra: float = None,
dec: float = None,
epoch: str = None,
equinox: str = None,
other_reference: str = None,
comment: str = None,
raise_error: bool = True,
search_db: bool = True,
ra_col_name: str = "ra_deg",
dec_col_name: str = "dec_deg",
epoch_col_name: str = "epoch_year",
use_simbad: bool = True,
):
"""
Parameters
----------
db: astrodbkit.astrodb.Database
Database object created by astrodbkit
source: str
Names of sources
reference: str
Discovery references of sources
ra: float, optional
Right ascensions of sources. Decimal degrees.
dec: float, optional
Declinations of sources. Decimal degrees.
comment: string, optional
Comments
epoch: str, optional
Epochs of coordinates
equinoxe: str, optional
Equinoxes of coordinates
other_references: str
raise_error: bool, optional
True (default): Raise an error if a source cannot be ingested
False: Log a warning but skip sources which cannot be ingested
search_db: bool, optional
True (default): Search database to see if source is already ingested
False: Ingest source without searching the database
ra_col_name: str
Name of the column in the database table that contains the right ascension
dec_col_name: str
Name of the column in the database table that contains the declination
use_simbad: bool
True (default): Use Simbad to resolve the source name if it is not found in the database
False: Do not use Simbad to resolve the source name.
Returns
-------
None
"""
logger.debug(f"Trying to ingest source: {source}")
# change unicode dashes characters to `-`
source = strip_unicode_dashes(source)
# Make sure reference is provided and in References table
ref_check = find_publication(db, reference=reference)
logger.debug(f"ref_check: {ref_check}")
if ref_check[0] is False:
msg = (
f"Skipping: {source}."
f"Discovery reference {reference} is either missing or "
" is not in Publications table. \n"
f"(Add it with ingest_publication function.)"
)
exit_function(msg, raise_error)
return
# Find out if source is already in database or not
if search_db:
logger.debug(f"Checking database for: {source} at ra: {ra}, dec: {dec}")
logger.debug(f" colnames: {ra_col_name}, {dec_col_name}")
name_matches = find_source_in_db(
db,
source,
ra=ra,
dec=dec,
ra_col_name=ra_col_name,
dec_col_name=dec_col_name,
use_simbad=use_simbad,
)
else:
name_matches = []
logger.debug(f" Source matches in database: {name_matches}")
# NOT INGESTING.
if len(name_matches) != 0:
msg1 = f" Not ingesting {source}. \n"
# One source match in the database, ingesting possible alt name
if len(name_matches) == 1:
ingest_name(db, name_matches[0], source)
msg2 = f" Already in database as {name_matches[0]}. \n "
# Multiple source matches in the database, unable to ingest source
elif len(name_matches) > 1:
msg2 = f" More than one match for {source}\n {name_matches}\n"
exit_function(msg1 + msg2, raise_error)
return
# Try to get coordinates from SIMBAD if they were not provided
if (ra is None or dec is None) and use_simbad:
# Try to get coordinates from SIMBAD
simbad_skycoord = coords_from_simbad(source)
if simbad_skycoord is None:
msg = f"Not ingesting {source}. Coordinates are needed and could not be retrieved from SIMBAD. \n"
exit_function(msg, raise_error)
# Using those coordinates for source.
ra = simbad_skycoord.to_string(style="decimal").split()[0]
dec = simbad_skycoord.to_string(style="decimal").split()[1]
epoch = "2000" # Default coordinates from SIMBAD are epoch 2000.
equinox = "J2000" # Default frame from SIMBAD is IRCS and J2000.
msg = f"Coordinates retrieved from SIMBAD {ra}, {dec}"
logger.debug(msg)
logger.debug(f" Ingesting {source}....")
# Construct data to be added
source_data = [
{
"source": source,
ra_col_name: ra,
dec_col_name: dec,
"reference": reference,
epoch_col_name: epoch,
"equinox": equinox,
"other_references": other_reference,
"comments": comment,
}
]
logger.debug(f" Data: {source_data}.")
# Try to add the source to the database
try:
with db.engine.connect() as conn:
conn.execute(db.Sources.insert().values(source_data))
conn.commit()
msg = f"Added {source_data}"
logger.info(f"Added {source}")
logger.debug(msg)
except sqlalchemy.exc.IntegrityError:
msg = f"Not ingesting {source}. Not sure why. \n"
msg2 = f" {source_data} "
logger.warning(msg)
logger.debug(msg2)
exit_function(msg + msg2, raise_error)
# Add the source name to the Names table
ingest_name(db, source=source, other_name=source, raise_error=raise_error)
return
# SURVEY DATA
def find_survey_name_in_simbad(sources, desig_prefix, source_id_index=None):
"""
Function to extract source designations from SIMBAD
Parameters
----------
sources: astropy.table.Table
Sources names to search for in SIMBAD
desig_prefix
prefix to search for in list of identifiers
source_id_index
After a designation is split, this index indicates source id suffix.
For example, source_id_index = 2 to extract suffix from "Gaia DR2" designations.
source_id_index = 1 to exctract suffix from "2MASS" designations.
Returns
-------
Astropy table
"""
n_sources = len(sources)
Simbad.reset_votable_fields()
Simbad.add_votable_fields("typed_id") # keep search term in result table
Simbad.add_votable_fields("ids") # add all SIMBAD identifiers as an output column
logger.info("simbad query started")
result_table = Simbad.query_objects(sources["source"])
logger.info("simbad query ended")
ind = result_table["SCRIPT_NUMBER_ID"] > 0 # find indexes which contain results
simbad_ids = result_table["TYPED_ID", "IDS"][ind]
db_names = []
simbad_designations = []
source_ids = []
for row in simbad_ids:
db_name = row["TYPED_ID"]
ids = row["IDS"].split("|")
designation = [i for i in ids if desig_prefix in i]
if designation:
logger.debug(f"{db_name}, {designation[0]}")
db_names.append(db_name)
if len(designation) == 1:
simbad_designations.append(designation[0])
else:
simbad_designations.append(designation[0])
logger.warning(f"more than one designation matched, {designation}")
if source_id_index is not None:
source_id = designation[0].split()[source_id_index]
source_ids.append(int(source_id)) # convert to int since long in Gaia
n_matches = len(db_names)
logger.info(
f"Found, {n_matches}, {desig_prefix}, sources for, {n_sources}, sources"
)
if source_id_index is not None:
result_table = Table(
[db_names, simbad_designations, source_ids],
names=("db_names", "designation", "source_id"),
)
else:
result_table = Table(
[db_names, simbad_designations], names=("db_names", "designation")
)
return result_table
def strip_unicode_dashes(source):
"""
Function to remove unicode dashes from source names and replace with `-`
"""
unicode_list = [
("\u2013", "en dash"),
("\u2014", "em dash"),
("\u2212", "minus sign"),
("\u2012", "figure dash"),
]
for char, char_name in unicode_list:
if char in source:
source = source.replace(char, "-")
logger.info(f"replaced {char_name}({char}) with - in {source}")
return source