Source code for scope.run_simulation

"""
Main module for running a simulation of the HRCCS data.
This module contains the main functions for simulating the data and
calculating the log likelihood and cross-correlation function of the data given the model parameters.
"""


import logging
import os

import numpy as np
from tqdm import tqdm

from scope.broadening import *
from scope.ccf import *
from scope.input_output import *
from scope.logger import *
from scope.noise import *
from scope.tellurics import *
from scope.utils import *

abs_path = os.path.dirname(__file__)


[docs] def make_data( scale, wlgrid, wl_model, Fp_conv, Fstar_conv, n_order, n_exposure, n_pixel, phases, Rp_solar, Rstar, seed=42, do_pca=True, blaze=False, tellurics=False, n_princ_comp=4, v_sys=0.0, Kp=192.06, star=False, SNR=0, rv_semiamp_orbit=0.3229, observation="emission", tell_type="ATRAN", time_dep_tell=False, wav_error=False, order_dep_throughput=False, a=1, # AU u1=0.3, u2=0.3, LD=True, b=0.0, # impact parameter divide_out_of_transit=False, out_of_transit_dur=0.1, v_sys_measured=0.0, vary_throughput=True, instrument="IGRINS", pca_removal="subtract", ): """ Creates a simulated HRCCS dataset. Main function. Inputs ------ :scale: (float) scaling factor for the data. :wlgrid: (array) wavelength grid for the data. :do_pca: (bool) if True, do PCA on the data. :blaze: (bool) if True, include the blaze function in the data. :tellurics: (bool) if True, include tellurics in the data. :n_princ_comp: (int) number of principal components to use. :v_sys: (float) systemic velocity of the planet. :Kp: (float) semi-amplitude of the planet. :star: (bool) if True, include the stellar spectrum in the data. :SNR: (float) photon signal-to-noise ratio (not of the planetary spectrum itself!) :observation: (str) 'emission' or 'transmission'. :tell_type: (str) 'ATRAN' or 'data-driven'. :time_dep_tell: (bool) if True, include time-dependent tellurics. not applicable for ATRAN. :wav_error: (bool) if True, include wavelength-dependent error. :order_dep_throughput: (bool) if True, include order-dependent throughput. Outputs ------- :pca_noise_matrix: (array) the data cube with only the larger-scale trends. :flux_cube: (array) the data cube with the larger-scale trends removed. :fTemp_nopca: (array) the data cube with all components. :just_tellurics: (array) the telluric model that's multiplied to the dataset. """ np.random.seed(seed) logger.info(f"Seed set to {seed}") rv_planet, rv_star = calc_rvs( v_sys, v_sys_measured, Kp, rv_semiamp_orbit, phases ) # measured in m/s now logger.debug(f"RV planet: {rv_planet}, RV star: {rv_star}") flux_cube = np.zeros( (n_order, n_exposure, n_pixel) ) # will store planet and star signal pca_noise_matrix = np.zeros((n_order, n_exposure, n_pixel)) for order in range(n_order): wlgrid_order = np.copy(wlgrid[order,]) # Cropped wavelengths flux_cube[order] = doppler_shift_planet_star( flux_cube[order], n_exposure, phases, rv_planet, rv_star, wlgrid_order, wl_model, Fp_conv, Rp_solar, Fstar_conv, Rstar, u1, u2, a, b, LD, scale, star, observation, ) throughput_baselines = np.loadtxt(abs_path + "/data/throughputs.txt") flux_cube = detrend_cube(flux_cube, n_order, n_exposure) if vary_throughput: for i in range(flux_cube.shape[1]): if i >= 79: j = 79 - i throughput_baseline = throughput_baselines[j] else: throughput_baseline = throughput_baselines[i] throughput_factor = throughput_baseline / np.nanpercentile( flux_cube[:, i, :], 90 ) flux_cube[:, i, :] *= throughput_factor if tellurics: if instrument != "IGRINS": raise NotImplementedError( "Only IGRINS data is currently supported for telluric simulations." "Please set tellurics=False, and at minimum ensure that your SNR input includes telluric losses." ) flux_cube = add_tellurics( wlgrid, flux_cube, n_order, n_exposure, vary_airmass=True, tell_type=tell_type, time_dep_tell=time_dep_tell, ) # these spline fits aren't perfect. we mask negative values to 0. just_tellurics = np.ones_like(flux_cube) just_tellurics = add_tellurics( wlgrid, just_tellurics, n_order, n_exposure, vary_airmass=True, tell_type=tell_type, time_dep_tell=time_dep_tell, ) flux_cube[flux_cube < 0.0] = 0.0 just_tellurics[just_tellurics < 0.0] = 0.0 flux_cube = detrend_cube(flux_cube, n_order, n_exposure) if blaze: flux_cube = add_blaze_function(wlgrid, flux_cube, n_order, n_exposure) flux_cube[flux_cube < 0.0] = 0.0 flux_cube[np.isnan(flux_cube)] = 0.0 flux_cube = detrend_cube(flux_cube, n_order, n_exposure) flux_cube[np.isnan(flux_cube)] = 0.0 if np.any(SNR > 0): # 0 means don't add noise! if order_dep_throughput: noise_model = instrument else: noise_model = "constant" print(f"Adding noise with model {noise_model}") flux_cube = add_noise_cube(flux_cube, wlgrid, SNR, noise_model=noise_model) flux_cube = detrend_cube(flux_cube, n_order, n_exposure) if wav_error: doppler_shifts = np.loadtxt( "data/doppler_shifts_w77ab.txt" ) # todo: create this! flux_cube = change_wavelength_solution(wlgrid, flux_cube, doppler_shifts) flux_cube = detrend_cube(flux_cube, n_order, n_exposure) flux_cube[np.isnan(flux_cube)] = 0.0 flux_cube_nopca = flux_cube.copy() if observation == "transmission" and divide_out_of_transit: # generate the out of transit baseline n_exposures_baseline = ( out_of_transit_dur * n_exposure ) # assuming n_exposure is fully just in transit out_of_transit_flux = np.ones_like( flux_cube ) # todo: fix shape for out of transit baseline # take star for exposure in range(n_exposures_baseline): flux_star = calc_doppler_shift( wlgrid_order, wl_model, Fstar_conv, rv_star[exposure] ) out_of_transit_flux[exposure,] *= flux_star out_of_transit_flux = detrend_cube(out_of_transit_flux, n_order, n_exposure) # add tellurics out_of_transit_flux = add_tellurics( wlgrid, out_of_transit_flux, n_order, n_exposures_baseline, vary_airmass=True, tell_type=tell_type, time_dep_tell=time_dep_tell, ) out_of_transit_flux = detrend_cube(out_of_transit_flux, n_order, n_exposure) # add blaze out_of_transit_flux = add_blaze_function( wlgrid, out_of_transit_flux, n_order, n_exposures_baseline, instrument=instrument, ) out_of_transit_flux[flux_cube < 0.0] = 0.0 out_of_transit_flux[np.isnan(flux_cube)] = 0.0 out_of_transit_flux = detrend_cube(out_of_transit_flux, n_order, n_exposure) # add noise out_of_transit_flux = add_noise_cube( out_of_transit_flux, wlgrid, SNR, noise_model=noise_model ) # take median median_out_of_transit = np.median(out_of_transit_flux, axis=1) # divide out the flux cube flux_cube /= median_out_of_transit # todo: check axes work out if do_pca: for j in range(n_order): flux_cube[j] -= np.mean(flux_cube[j]) flux_cube[j] /= np.std(flux_cube[j]) flux_cube[j], pca_noise_matrix[j] = perform_pca( flux_cube[j], n_princ_comp, pca_removal=pca_removal ) else: for j in range(n_order): for i in range(n_exposure): flux_cube[j][i] -= np.mean(flux_cube[j][i]) if np.all(pca_noise_matrix == 0): pca_noise_matrix = np.ones_like(pca_noise_matrix) if tellurics: return pca_noise_matrix, flux_cube, flux_cube_nopca, just_tellurics return ( pca_noise_matrix, flux_cube, flux_cube_nopca, np.ones_like(flux_cube), ) # returning CCF and logL values
# @njit
[docs] def calc_log_likelihood( v_sys, Kp, scale, wlgrid, wl_model, Fp_conv, Fstar_conv, flux_cube, n_order, n_exposure, n_pixel, phases, Rp_solar, Rstar, rv_semiamp_orbit, A_noplanet, do_pca=True, n_princ_comp=4, star=False, observation="emission", a=1, # AU u1=0.3, u2=0.3, LD=True, b=0.0, # impact parameter v_sys_measured=0.0, pca_removal="subtract", ): """ Calculates the log likelihood and cross-correlation function of the data given the model parameters. Inputs ------ :v_sys: float Systemic velocity of the system in m/s :Kp: float Planet semi-amplitude in m/s :scale: float Planet-to-star flux ratio :wlgrid: array Wavelength grid :Fp_conv: array Planet spectrum convolved with the instrumental profile :Fstar_conv: array Stellar spectrum convolved with the instrumental profile :flux_cube: array Data cube :n_order: int Number of orders :n_exposure: int Number of exposures :n_pixel: int Number of pixels :phases: array Orbital phases :Rp_solar: float Planet radius in solar radii :Rstar: float Stellar radius in solar radii :rv_semiamp_orbit: float Semi-amplitude of the orbit in km/s. This is the motion of the *star*. :A_noplanet: array Array of the non-planet component of the data cube :do_pca: bool Whether to perform PCA on the data cube :n_princ_comp: int Number of principal components to use in the PCA :star: bool Whether to include the stellar component in the simulation. :observation: str Type of observation. Currently supported: 'emission', 'transmission' Outputs ------- :logL: float Log likelihood of the data given the model parameters :ccf: array Cross-correlation function of the data given the model parameters """ rv_planet, rv_star = calc_rvs( v_sys, v_sys_measured, Kp, rv_semiamp_orbit, phases ) # measured in m/s CCF = 0.0 logL = 0.0 for order in range(n_order): # grab the wavelengths from each order wlgrid_order = np.copy(wlgrid[order,]) model_flux_cube = np.zeros((n_exposure, n_pixel)) model_flux_cube = doppler_shift_planet_star( model_flux_cube, n_exposure, phases, rv_planet, rv_star, wlgrid_order, wl_model, Fp_conv, Rp_solar, Fstar_conv, Rstar, u1, u2, a, b, LD, scale, star, observation, reprocessing=True, ) if do_pca: # this is the "model reprocessing" step. model_flux_cube *= A_noplanet[order] model_flux_cube, _ = perform_pca( model_flux_cube, n_princ_comp, pca_removal=pca_removal ) logl, ccf = calc_ccf(model_flux_cube, flux_cube[order], n_pixel) CCF += ccf logL += logl return logL, CCF # returning CCF and logL values
[docs] def simulate_observation( planet_spectrum_path=".", star_spectrum_path=".", data_cube_path=".", phase_start=0, phase_end=1, n_exposures=10, observation="emission", blaze=True, n_princ_comp=4, star=True, SNR=250, telluric=True, tell_type="data-driven", time_dep_tell=False, wav_error=False, rv_semiamp_orbit=0.3229, order_dep_throughput=True, Rp=1.21, # Jupiter radii, Rstar=0.955, # solar radii kp=192.02, # planetary orbital velocity, km/s v_rot=4.5, scale=1.0, v_sys=0.0, modelname="yourfirstsimulation", divide_out_of_transit=False, out_of_transit_dur=0.1, include_rm=False, v_rot_star=3.0, a=0.033, # lambda_misalign=0.0, inc=90.0, seed=42, LD=True, vary_throughput=True, instrument="IGRINS", snr_path=None, planet_name="yourfirstplanet", n_kp=200, n_vsys=200, pca_removal="subtract", **kwargs, ): """ Run a simulation of the data, given a grid index and some paths. Side effects: writes some files in the output directory. Inputs ------ grid_ind: int The index of the grid to use. planet_spectrum_path: str The path to the planet spectrum. star_spectrum_path: str The path to the star spectrum. phases: array-like The phases of the observations. observation: str Type of observation to simulate. Currently supported: emission, transmission. Outputs ------- None """ # make the output directory outdir = abs_path + f"/output/{modelname}" make_outdir(outdir) # and write the input file out output_args = locals() output_args.update(kwargs) write_input_file(output_args, output_file_path=f"{outdir}/input.txt") phases = np.linspace(phase_start, phase_end, n_exposures) Rp_solar = Rp * rjup_rsun # convert from jupiter radii to solar radii Kp_array = np.linspace(kp - 100, kp + 100, n_kp) v_sys_array = np.linspace(v_sys - 100, v_sys + 100, n_vsys) if instrument == "IGRINS": n_order, n_pixel = (44, 1848) mike_wave, mike_cube = pickle.load( open(data_cube_path, "rb"), encoding="latin1" ) wl_cube_model = mike_wave.copy().astype(np.float64) elif instrument == "CRIRES+": # need to read in the filepath n_order, n_pixel, wl_cube_model, SNR = read_crires_data(snr_path) wl_model, Fp, Fstar = np.load(planet_spectrum_path, allow_pickle=True) wl_model = wl_model.astype(np.float64) # Fp_conv_rot = broaden_spectrum(wl_model / 1e6, Fp, 0, vl=v_rot) rot_ker = get_rot_ker(v_rot, wl_model, observation) Fp_conv_rot = np.convolve(Fp, rot_ker, mode="same") # instrument profile convolution instrument_kernel = get_instrument_kernel(instrument) Fp_conv = np.convolve(Fp_conv_rot, instrument_kernel, mode="same") print("here spectrum") print(Fp_conv) star_wave, star_flux = np.loadtxt( star_spectrum_path ).T # Phoenix stellar model packing if include_rm: star_flux, _ = make_stellar_disk( star_flux, star_wave, v_rot_star, phases, Rstar, inc, lambda_misalign, a, Rp ) # todo: swap out Fstar_conv = get_star_spline( star_wave, star_flux, wl_model, instrument_kernel, smooth=False ) # fill with NaNs because we're checkpointing. 0 would be a valid value, NaNs indicate that we haven't calculated it yet. lls, ccfs = np.zeros((n_kp, n_vsys)) * np.nan, np.zeros((n_kp, n_vsys)) * np.nan # redoing the grid. how close does PCA get to a tellurics-free signal detection? A_noplanet, flux_cube, flux_cube_nopca, just_tellurics = make_data( scale, wl_cube_model, wl_model, Fp_conv, Fstar_conv, n_order, n_exposures, n_pixel, phases, Rp_solar, Rstar, seed=seed, do_pca=True, blaze=blaze, n_princ_comp=n_princ_comp, tellurics=telluric, v_sys=v_sys, star=star, Kp=kp, SNR=SNR, rv_semiamp_orbit=rv_semiamp_orbit, tell_type=tell_type, time_dep_tell=time_dep_tell, wav_error=wav_error, order_dep_throughput=order_dep_throughput, observation=observation, divide_out_of_transit=divide_out_of_transit, out_of_transit_dur=0.1, v_sys_measured=v_sys, LD=LD, instrument=instrument, pca_removal=pca_removal, ) run_name = f"{n_princ_comp}_NPC_{blaze}_blaze_{star}_star_{telluric}_telluric_{round(np.mean(SNR))}_SNR_{tell_type}_{time_dep_tell}_{wav_error}_{order_dep_throughput}_{seed}" save_data(outdir, run_name, flux_cube, flux_cube_nopca, A_noplanet, just_tellurics) for l, Kp in tqdm( enumerate(Kp_array), total=len(Kp_array), desc="looping PCA over Kp" ): for k, v_sys in enumerate(v_sys_array): res = calc_log_likelihood( v_sys, Kp, scale, wl_cube_model, wl_model, Fp_conv, Fstar_conv, flux_cube, n_order, n_exposures, n_pixel, phases, Rp_solar, Rstar, rv_semiamp_orbit, do_pca=True, n_princ_comp=n_princ_comp, A_noplanet=A_noplanet, star=star, observation=observation, v_sys_measured=v_sys, LD=LD, pca_removal=pca_removal, ) lls[l, k], ccfs[l, k] = res # "checkpoint" by saving the results after each iteration! save_results(outdir, run_name, lls, ccfs)
if __name__ == "__main__": args = parse_arguments() # First, parse the input file to get base parameters inputs = parse_input_file(args.input_file) # Then override with any command line arguments that were explicitly provided args_dict = vars(args) for key, value in args_dict.items(): # Skip the input_file parameter if key == "input_file": continue # Only override if the argument was explicitly provided (not None) if value is not None: inputs[key] = value logger = setup_logging(log_level=inputs["log_level"]) logger.debug(f"Parsed inputs: {inputs}") print(inputs) # Call the simulation function with the merged parameters try: simulate_observation(**inputs) except Exception as e: logger.error(f"An error occurred: {e}", exc_info=True)