"""
Utility functions for simulating HRCCS data.
"""
import json
import os
import pickle
import astropy.units as u
import matplotlib.pyplot as plt
import numpy as np
from numba import njit
from scipy import signal
from scipy.interpolate import interp1d
from scipy.ndimage import gaussian_filter1d
from tqdm import tqdm
from scope.constants import *
abs_path = os.path.dirname(__file__)
np.random.seed(42)
start_clip = 200
end_clip = 100
[docs]
@njit
def doppler_shift_planet_star(
model_flux_cube,
n_exposure,
phases,
rv_planet,
rv_star,
wlgrid_order,
wl_model,
Fp_conv,
Rp_solar,
Fstar_conv,
Rstar,
u1,
u2,
a,
b,
LD,
scale,
star,
observation,
reprocessing=False,
):
for exposure in range(n_exposure):
_, flux_planet = calc_doppler_shift(
wlgrid_order, wl_model, Fp_conv, rv_planet[exposure]
)
flux_planet *= scale # apply scale factor
_, flux_star = calc_doppler_shift(
wlgrid_order, wl_model, Fstar_conv, rv_star[exposure]
)
if star and observation == "emission":
model_flux_cube[exposure,] = (flux_planet * Rp_solar**2) / (
flux_star * Rstar**2
) + 1.0
if not reprocessing:
model_flux_cube[exposure,] *= flux_star * Rstar**2
elif observation == "emission":
model_flux_cube[exposure,] = flux_planet * (Rp_solar * rsun) ** 2
elif (
observation == "transmission"
): # in transmission, after we "divide out" (with PCA) the star and tellurics, we're left with Fp.
I = calc_limb_darkening(u1, u2, a, b, Rstar, phases[exposure], LD)
model_flux_cube[exposure,] = 1.0 - flux_planet * I
if not reprocessing:
model_flux_cube[exposure,] *= flux_star
return model_flux_cube
[docs]
def save_results(outdir, run_name, lls, ccfs):
np.savetxt(
f"{outdir}/lls_{run_name}.txt",
lls,
)
np.savetxt(
f"{outdir}/ccfs_{run_name}.txt",
ccfs,
)
[docs]
def make_outdir(outdir):
try:
os.makedirs(outdir)
except FileExistsError:
print("Directory already exists. Continuing!")
[docs]
def get_instrument_kernel(instrument, model_resolution=250000, kernel_size=41):
"""
Creates a Gaussian kernel for instrument response using an alternative implementation.
Parameters
----------
resolution_ratio : float
Ratio of resolutions (default: 250000/45000)
kernel_size : int
Size of the kernel (must be odd, default: 41)
Returns
-------
numpy.ndarray
Normalized Gaussian kernel
"""
instrument_resolution_dict = {
"IGRINS": 45000,
"CRIRES+": 145000,
}
instrument_resolution = instrument_resolution_dict[instrument]
resolution_ratio = instrument_resolution / model_resolution
# Ensure kernel size is odd
if kernel_size % 2 == 0:
raise ValueError("Kernel size must be odd")
# Convert resolution ratio to standard deviation
std = resolution_ratio / (2.0 * np.sqrt(2.0 * np.log(2.0)))
# Create the Gaussian window
gaussian_window = signal.windows.gaussian(
kernel_size, std=std, sym=True # Ensure symmetry
)
# Normalize to sum to 1
normalized_kernel = gaussian_window / gaussian_window.sum()
return normalized_kernel
[docs]
def save_data(outdir, run_name, flux_cube, flux_cube_nopca, A_noplanet, just_tellurics):
"""
Saves data to a pickle file.
Inputs
------
:outdir: (str) output directory
:run_name: (str) name of the run
:data: (dict) data to save
"""
with open(
f"{outdir}/simdata_{run_name}.txt",
"wb",
) as f:
pickle.dump(flux_cube, f)
with open(
f"{outdir}/nopca_simdata_{run_name}.txt",
"wb",
) as f:
pickle.dump(flux_cube_nopca, f)
with open(
f"{outdir}/A_noplanet_{run_name}.txt",
"wb",
) as f:
pickle.dump(A_noplanet, f)
with open(f"{outdir}/just_tellurics_vary_airmass.txt", "wb") as f:
pickle.dump(just_tellurics, f)
[docs]
@njit
def calc_limb_darkening(u1, u2, a, b, Rstar, ph, LD):
"""
calculates limb darkening as a function of phase
:u1: (float) linear limb darkening coefficient
:u2: (float) quadratic limb darkening coefficient
:a: (float) semi-major axis in meters
:b: (float) impact parameter
:Rstar: (float) stellar radius in solar radii
:ph: (array) phase
:LD: (bool) whether to apply limb darkening
"""
if LD: # apply limb darkening. 1D style!
x = (a * np.sin(2 * np.pi * ph)) / (Rstar * rsun)
if x**2 <= 1 - b**2:
mu = np.sqrt(1 - x**2 - b**2)
I = 1 - u1 * (1 - mu) - u2 * (1 - mu) ** 2
else:
I = 0.0
else:
I = 1.0
return I
# todo: download atran scripts
# todo: fit wavelength solution stuff
# todo: plot the maps
[docs]
@njit
def calc_doppler_shift(eval_wave, template_wave, template_flux, v):
"""
Doppler shifts a spectrum. Evaluates the flux at a different grid.
convention: positive v is redshift.
Inputs
------
:wl: wavelength grid
:flux: flux grid
:v: velocity. Must be in m/s.
Outputs
-------
:flux_shifted: shifted flux grid
"""
beta = v / const_c
delta_lam = eval_wave * beta
shifted_wave = eval_wave - delta_lam
shifted_flux = np.interp(shifted_wave, template_wave, template_flux)
return shifted_wave, shifted_flux
[docs]
@njit
def calc_rvs(v_sys, v_sys_measured, Kp, Kstar, phases):
"""
calculate radial velocities of planet and star.
Inputs
------
v_sys: float
Systemic velocity of the system. Measured in km/s
v_sys_measured: float
Measured systemic velocity of the system. Measured in km/s
Kp: float
Planet semi-amplitude. Measured in km/s
Kstar: float
Star semi-amplitude. Measured in km/s
phases: array
Orbital phases of the system. Measured in radians.
Returns
-------
rv_planet: array
Radial velocities of the planet. Measured in m/s
rv_star: array
Radial velocities of the star. Measured in m/s
"""
v_sys_tot = v_sys + v_sys_measured # total offset
rv_planet = v_sys_tot + Kp * np.sin(
2.0 * np.pi * phases
) # input in km/s, convert to m/s
rv_star = v_sys_measured - Kstar * np.sin(
2.0 * np.pi * phases
) # measured in m/s. note opposite sign!
return rv_planet * 1e3, rv_star * 1e3
[docs]
def get_star_spline(star_wave, star_flux, planet_wave, yker, smooth=True):
"""
Calculates the stellar spline using an alternative implementation with linear interpolation
instead of B-splines.
Inputs
------
star_wave: array
Wavelengths of the star. Measured in microns.
star_flux: array
Fluxes of the star. Measured in W/m^2/micron.
planet_wave: array
Wavelengths of the planet. Measured in microns.
yker: array
Convolution kernel.
smooth: bool
Whether or not to smooth the star spectrum. Default is True.
Returns
-------
star_flux: array
Interpolated and processed stellar fluxes. Measured in W/m^2/micron.
"""
# Create interpolation function using scipy's interp1d
# Using cubic interpolation for smoother results
interpolator = interp1d(
star_wave,
star_flux,
kind="cubic",
bounds_error=False,
fill_value=(star_flux[0], star_flux[-1]), # Extrapolate with edge values
)
# Interpolate onto planet wavelength grid
interpolated_flux = interpolator(planet_wave)
# Perform convolution
convolved_flux = np.convolve(interpolated_flux, yker, mode="same")
# Apply Gaussian smoothing if requested
if smooth:
final_flux = gaussian_filter1d(convolved_flux, sigma=200)
else:
final_flux = convolved_flux
return final_flux
[docs]
def change_wavelength_solution(wl_cube_model, flux_cube_model, doppler_shifts):
"""
Takes a finalized wavelength cube and makes the wavelength solution for each exposure just slightly wrong.
for now, it just shifts things. doesn't stretch.
this basically interpolates the flux array onto a different wavelength grid. don't worry about the edge, it'll be
trimmed?
for now: uses prescribed pixel shifts!
Inputs
------
:wl_cube_model: (array) wavelength grid of simulated data!
:flux_cube_model: (array) flux cube for simulated data!
:pixel_shifts: (array) N_exp long array of number of wavelength pixels to shift. Convention: positive number is
redshift (shift to larger pixels). shifts must be in km/s.
"""
n_order = flux_cube_model.shape[0]
# iterate through each exposure
for exp, doppler_shift in enumerate(doppler_shifts):
doppler_shift *= 1e3 # convert to m/s
# iterate through each exposure
for order in range(n_order):
wl_grid = wl_cube_model[order]
flux = flux_cube_model[order][exp]
flux_cube_model[order][exp] = calc_doppler_shift(
wl_grid, wl_grid, flux, doppler_shift
)
return flux_cube_model
[docs]
def add_blaze_function(
wl_cube_model, flux_cube_model, n_order, n_exp, instrument="IGRINS"
):
"""
Adds the blaze function to the model.
Inputs
------
:wl_cube_model: (array) wavelength cube model
:flux_cube_model: (array) flux cube model
:n_order: (int) number of orders
:n_exp: (int) number of exposures
Outputs
-------
:flux_cube_model: (array) flux cube model with blaze function included.
"""
# read in...have to somehow match the telluric spectra
if instrument != "IGRINS":
raise NotImplementedError(
"Only the IGRINS blaze function is currently supported."
)
with open(abs_path + "/data/K_blaze_spectra.pic", "rb") as f:
K_blaze_cube = pickle.load(f)
with open(abs_path + "/data/H_blaze_spectra.pic", "rb") as f:
H_blaze_cube = pickle.load(f)
n_orders_k = K_blaze_cube.shape[0]
n_orders_h = H_blaze_cube.shape[0]
# K first, then H
K_blaze_cube = detrend_cube(K_blaze_cube, n_orders_k, n_exp, blaze=True)
H_blaze_cube = detrend_cube(H_blaze_cube, n_orders_h, n_exp, blaze=True)
for order in tqdm(range(n_order), desc="adding blaze function"):
flux_cube_model_slice = flux_cube_model[order, :, :]
if order >= n_orders_k:
blaze_cube = H_blaze_cube
order_used = order - 19
else:
blaze_cube = K_blaze_cube
order_used = order
flux_cube_model_slice = flux_cube_model_slice * blaze_cube[order_used][100:-100]
flux_cube_model[order, :, :] = flux_cube_model_slice
return flux_cube_model
[docs]
def detrend_cube(cube, n_order, n_exp, blaze=False):
"""
Detrends the cube by dividing each order by its maximum value.
Inputs
------
:cube: (array) flux cube
:n_order: (int) number of orders
:n_exp: (int) number of exposures
Outputs
-------
:cube: (array) detrended flux cube
"""
def detrend_array(array):
max_val = np.nanmax(array)
return array / max_val
progress = tqdm(range(n_order), desc="detrending cube")
if not blaze:
for order in progress:
for exp in range(n_exp):
cube[order, exp] = detrend_array(cube[order, exp])
else:
for order in progress:
cube[order] = detrend_array(cube[order])
return cube
[docs]
def unpack_grid(grid_ind, parameter_list):
param_dict = parameter_list[grid_ind]
(
blaze,
n_princ_comp,
star,
SNR,
telluric,
tell_type,
time_dep_tell,
wav_error,
order_dep_throughput,
) = (
param_dict["blaze"],
param_dict["n_princ_comp"],
param_dict["star"],
param_dict["SNR"],
param_dict["telluric"],
param_dict["telluric_type"],
param_dict["time_dep_telluric"],
param_dict["wav_error"],
param_dict["order_dep_throughput"],
)
return (
blaze,
n_princ_comp,
star,
SNR,
telluric,
tell_type,
time_dep_tell,
wav_error,
order_dep_throughput,
)