import os
from glob import glob
import matplotlib.pyplot as plt
import numpy as np
from scipy.stats import binned_statistic
from astropy.io import fits
from astropy.time import Time
from astropy.stats import SigmaClip, mad_std
from .linalg import linreg, RegressionResult
__all__ = ['CheopsLightCurve', 'JointLightCurve']
attrs = [ # These vectors are from DRP
"background",
"bjd_time",
"centroid_x",
"centroid_y",
"conta_lc",
"conta_lc_err",
"dark",
"event",
"flux",
"fluxerr",
"location_x",
"location_y",
"mjd_time",
"roll_angle",
"smearing_lc",
"smearing_lc_err",
"status",
"utc_time"
] + [ # These vectors are from PIPE
"u0",
"u1",
"u2"
]
def normalize(vector):
"""
Normalize a vector such that its contents range from [-0.5, 0.5]
"""
return (vector - vector.min()) / vector.ptp() - 0.5
[docs]class CheopsLightCurve(object):
"""
Data handling class for CHEOPS light curves.
"""
def __init__(self, record_array={}, extra_basis_vectors=None,
time=None, mask=None, norm=True):
"""
Parameters
----------
record_array : `~numpy.recarray`
Record array of column vectors and their labels (names). Often
this record array comes straight from a FITS file.
hk_record_array : `~numpy.recarray`
Record array of column vectors and their labels from the housekeeping
FITS file which often ends in "SCI_CAL_SubArray_*.fits". Often
this record array comes straight from a FITS file.
norm : bool
Normalize the fluxes such that the median flux is unity. Default is
True.
"""
self.recs = record_array
self.extra_basis_vectors = extra_basis_vectors
for key in attrs:
if (hasattr(self.recs, 'columns') and
key in [i.lower() for i in self.recs.names]):
setattr(self, key, self.recs[key.upper()])
# Catch case for renamed roll angle key in the PIPE outputs
elif (hasattr(self.recs, 'columns') and
'roll' in [i.lower() for i in self.recs.names]):
setattr(self, 'roll_angle', self.recs['roll'])
self.time = (Time(self.bjd_time, format='jd')
if hasattr(self, 'bjd_time') else time)
if hasattr(self, 'status') and hasattr(self, 'event'):
self.mask = (np.isnan(self.flux) | self.status.astype(bool) |
self.event.astype(bool)) if hasattr(self, 'flux') else mask
else:
self.mask = np.isnan(self.flux) if hasattr(self, 'flux') else mask
if hasattr(self, 'flux') and norm:
self.fluxerr = self.fluxerr / np.nanmedian(self.flux)
self.flux = self.flux / np.nanmedian(self.flux)
[docs] @classmethod
def from_fits(cls, path, extra_basis_vectors=None, norm=True):
"""
Load a FITS file from DACE or the DRP.
Parameters
----------
path : str
Path to the FITS file containing the data to load.
extra_basis_vectors : `~numpy.ndarray`
Extra basis vectors to add to the design matrix.
norm : bool
Normalize the fluxes such that the median flux is unity. Default is
True.
"""
return cls(fits.getdata(path), extra_basis_vectors=extra_basis_vectors,
norm=norm)
[docs] @classmethod
def from_example(cls, norm=True):
"""
Load example 55 Cnc e light curve (**NOTE**: this is not real data).
Parameters
----------
norm : bool
Normalize the fluxes such that the median flux is unity. Default is
True.
"""
path = os.path.join(os.path.dirname(__file__), 'data',
'example_55Cnce.fits')
return cls.from_fits(path, norm=norm)
[docs] def plot(self, ax=None, **kwargs):
"""
Plot the light curve.
Parameters
----------
ax : `~matplotlib.axes.Axes`
Matplotlib axis instance on which to build the plot
kwargs : dict
Further keyword arguments to pass to `~matplotlib.pyplot.plot`.
Returns
-------
ax : `~matplotlib.axes.Axes`
Matplotlib axis instance with the light curve plotted on it.
"""
if ax is None:
ax = plt.gca()
ax.errorbar(self.bjd_time[~self.mask], self.flux[~self.mask],
self.fluxerr[~self.mask], **kwargs)
return ax
[docs] def design_matrix(self, norm=True):
"""
Generate the design matrix.
Parameters
----------
norm : bool
Normalize the column vectors within the design matrix such that they
have mean=zero and range=unity.
Returns
-------
X : `~numpy.ndarray`
Design matrix (concatenated column vectors of observables)
"""
if norm:
X = np.vstack([
normalize(np.cos(np.radians(self.roll_angle))),
normalize(np.sin(np.radians(self.roll_angle))),
np.ones(len(self.bjd_time)),
]).T
else:
X = np.vstack([
np.cos(np.radians(self.roll_angle)),
np.sin(np.radians(self.roll_angle)),
np.ones(len(self.bjd_time)),
]).T
if self.extra_basis_vectors is not None:
X = np.vstack([
X.T, self.extra_basis_vectors,
]).T
return X[~self.mask]
[docs] def sigma_clip_centroid(self, sigma=3.5, plot=False):
"""
Sigma-clip the light curve on centroid position (update mask).
Parameters
----------
sigma : float
Factor of standard deviations away from the median centroid position
to clip on.
plot : bool
Plot the accepted centroids (in black) and the centroids of the
rejected fluxes (in red).
"""
x_mean = np.median(self.centroid_x)
y_mean = np.median(self.centroid_y)
x_std = mad_std(self.centroid_x)
y_std = mad_std(self.centroid_y)
outliers = (sigma * min([x_std, y_std]) <
np.hypot(self.centroid_x - x_mean,
self.centroid_y - y_mean))
if plot:
plt.scatter(self.centroid_x[~outliers], self.centroid_y[~outliers],
marker=',', color='k')
plt.scatter(self.centroid_x[outliers], self.centroid_y[outliers],
marker='.', color='r')
plt.xlabel('BJD')
plt.ylabel('Flux')
self.mask |= outliers
[docs] def sigma_clip_flux(self, sigma_upper=4, sigma_lower=4, maxiters=None,
plot=False):
"""
Sigma-clip the light curve on fluxes (update mask).
Parameters
----------
sigma_upper : float
Factor of standard deviations above the median centroid position
to clip on.
sigma_lower : float
Factor of standard deviations below the median centroid position
to clip on.
maxiters : float or None
Number of sigma-clipping iterations. Default is None, which repeats
until there are no outliers left.
plot : float
Plot the accepted fluxes (in black) and the rejected fluxes (in red)
"""
sc = SigmaClip(sigma_upper=sigma_upper, sigma_lower=sigma_lower,
stdfunc=mad_std, maxiters=maxiters)
self.mask[~self.mask] |= sc(self.flux[~self.mask]).mask
if plot:
plt.plot(self.bjd_time[self.mask], self.flux[self.mask], 'r.')
plt.plot(self.bjd_time[~self.mask], self.flux[~self.mask], 'k.')
plt.xlabel('BJD')
plt.ylabel('Flux')
[docs] def regress(self, design_matrix):
r"""
Regress the design matrix against the fluxes.
Parameters
----------
design_matrix : `~numpy.ndarray`
Design matrix (concatenated column vectors of observables)
Returns
-------
betas : `~numpy.ndarray`
Least squares estimators :math:`\hat{\beta}`
cov : `~numpy.ndarray`
Covariance matrix for the least squares estimators
:math:`\sigma_{\hat{\beta}}^2`
"""
b, c = linreg(design_matrix,
self.flux[~self.mask],
self.fluxerr[~self.mask])
return RegressionResult(design_matrix, b, c)
[docs] def plot_phase_curve(self, r, params, t_fine, transit_fine, sinusoid_fine,
t0_offset=0, n_regressors=2, bins=15):
"""
Plot the best-fit phase curve.
Parameters
----------
r : `~linea.RegressionResult`
Result of the linear regression
params : `~linea.Planet`
Transiting exoplanet parameters
t_fine : `~numpy.ndarray`
Times computed on a grid finer than the original observations
transit_fine : `~numpy.ndarray`
Transit model computed at times ``t_fine``
sinusoid_fine : `~numpy.ndarray`
Sinusoidal phase curve model computed at times ``t_fine``
t0_offset : float, optional
Time offset between the mid-transit time defined by ``params`` and
the true mid-transit time [days]. Default is zero.
n_regressors : int, optional
Number of regressors used to parameterize the phase curve.
Default is two.
bins : int, optional
Number of bins to break the light curve into when plotting (black),
default is 15.
Returns
-------
fig, ax : `~matplotlib.figure.Figure`, `~matplotlib.axes.Axes`
Figure and axis objects containing the phase curve plot.
"""
transit = r.X[:, 0]
sinusoid = (r.X[:, 1:n_regressors+1] @ r.betas[1:n_regressors+1] /
r.best_fit)
phases = ((self.bjd_time[~self.mask] - params.t0 - t0_offset) %
params.per) / params.per
phases[phases > 0.95] -= 1
phases_fine = (((t_fine - params.t0 - t0_offset) % params.per) /
params.per)
phases_fine[phases_fine > 0.95] -= 1
fig, ax = plt.subplots(2, 1, figsize=(4.5, 8), sharex=True)
ax[0].plot(phases, (transit + 1) * (
self.flux[~self.mask] / r.best_fit + sinusoid), '.',
color='silver')
bs = binned_statistic(phases,
(transit + 1) * (self.flux[~self.mask] /
r.best_fit + sinusoid),
bins=bins, statistic='median')
bincenters = 0.5 * (bs.bin_edges[1:] + bs.bin_edges[:-1])
ax[0].plot(bincenters, bs.statistic, 's', color='k')
ax[0].plot(phases_fine[np.argsort(phases_fine)],
(transit_fine + sinusoid_fine)[np.argsort(phases_fine)], 'r')
ax[0].set(ylabel='Phase Curve')
bs_resid = binned_statistic(phases,
self.flux[~self.mask] / r.best_fit - 1,
bins=bins, statistic='median')
ax[1].plot(phases, self.flux[~self.mask] / r.best_fit - 1, '.',
color='silver')
ax[1].plot(bincenters, bs_resid.statistic, 's', color='k')
ax[1].set(ylabel='Residuals', xlabel='Phase')
ax[1].set_xticks(np.arange(-0, 1, 0.1), minor=True)
for axis in ax:
for sp in ['right', 'top']:
axis.spines[sp].set_visible(False)
ax[0].ticklabel_format(useOffset=False)
return fig, ax
[docs] def phase(self, planet_params):
"""
Orbital phase of planet at times ``lc.bjd_time``.
Parameters
----------
planet_params : `~linea.Planet`
Planet parameter object.
Returns
-------
phases : `~numpy.ndarray`
Orbital phases at times ``lc.bjd_time``
"""
return (((self.bjd_time - planet_params.t0) % planet_params.per) /
planet_params.per)
[docs]class JointLightCurve(object):
"""
Joint analysis object for multiple CHEOPS light curves.
"""
def __init__(self, light_curves):
"""
Parameters
----------
light_curves : list
List of `~linea.CheopsLightCurve` objects.
"""
self.light_curves = light_curves
self.attrs = [attr.lower() for attr in
light_curves[0].recs.dtype.fields]
for attr in self.attrs:
if hasattr(self, attr):
setattr(self, attr, [getattr(lc, attr) for lc in light_curves])
[docs] @classmethod
def from_example(cls, norm=True):
"""
Load example WASP-189 b light curves (**NOTE**: this is not real data).
Parameters
----------
norm : bool
Normalize the fluxes such that the median flux is unity. Default is
True.
"""
path = os.path.join(os.path.dirname(__file__), 'data',
'example_wasp189_*.fits')
wasp189_light_curves = [CheopsLightCurve.from_fits(p, norm=norm)
for p in glob(path)]
return cls(wasp189_light_curves)
[docs] def concatenate(self):
"""
Concatenate light curves into a single ``ConcatenatedLightCurve``.
Returns
-------
c : `~collections.namedtuple`
Named tuple containing the concatenated contents of the
JointLightCurve object.
"""
c = CheopsLightCurve(time=Time(np.concatenate([lc.time.jd
for lc in self]),
format='jd'),
mask=np.concatenate([lc.mask for lc in self]))
for attr in self.attrs:
attr_to_concat = [getattr(lc, attr)
for lc in self
if hasattr(lc, attr)]
setattr(c, attr, np.concatenate(attr_to_concat)
if len(attr_to_concat) > 0 else None)
return c
def _pad_shapes(self):
shapes = []
for lc in self:
shapes.append(np.count_nonzero(~lc.mask))
return shapes
[docs] def combined_design_matrix(self, design_matrices=None, norm=True):
"""
Generate the combined design matrix, from a list of design matrices, one
per visit.
Parameters
----------
design_matrices : list of `~numpy.ndarray` (optional)
List of design matrices, one per visit. If None is supplied, fetch
the design matrices from each of the `~linea.CheopsLightCurve`
objects used to initialize the `~linea.JointLightCurve`.
Returns
-------
X : `~numpy.ndarray`
Design matrix (concatenated column vectors of observables)
"""
if design_matrices is None:
design_matrices = [lc.design_matrix(norm=norm) for lc in self]
shapes = self._pad_shapes()
ndim = design_matrices[0].shape[1]
Xs_padded = []
for i in range(len(design_matrices)):
before = shapes[:i]
after = shapes[i+1:]
prepad = np.zeros((sum(before), ndim)) if len(before) > 0 else None
postpad = np.zeros((sum(after), ndim)) if len(after) > 0 else None
segments = []
for j in [prepad, design_matrices[i], postpad]:
if j is not None:
segments.append(j)
Xs_padded.append(np.vstack(segments))
return np.hstack(Xs_padded)
def __iter__(self):
"""
When iterating over ``JointLightCurve`` objects, iterate over items in
the ``self.light_curves`` list used to initialize the object.
"""
yield from self.light_curves
def __len__(self):
return len(self.light_curves)
def __getitem__(self, item):
return self.light_curves[item]
[docs] def regress(self, design_matrix):
r"""
Regress the design matrix against the fluxes.
Parameters
----------
design_matrix : `~numpy.ndarray`
Design matrix (concatenated column vectors of observables)
Returns
-------
betas : `~numpy.ndarray`
Least squares estimators :math:`\hat{\beta}`
cov : `~numpy.ndarray`
Covariance matrix for the least squares estimators
:math:`\sigma_{\hat{\beta}}^2`
"""
flux = np.concatenate([lc.flux for lc in self])
fluxerr = np.concatenate([lc.fluxerr for lc in self])
mask = np.concatenate([lc.mask for lc in self])
b, c = linreg(design_matrix,
flux[~mask],
fluxerr[~mask])
return RegressionResult(design_matrix, b, c)
[docs] def plot(self, ax=None, **kwargs):
"""
Plot the light curve.
Parameters
----------
ax : `~matplotlib.axes.Axes`
Matplotlib axis instance on which to build the plot
kwargs : dict
Further keyword arguments to pass to `~matplotlib.pyplot.plot`.
Returns
-------
ax : `~matplotlib.axes.Axes`
Matplotlib axis instance with the light curve plotted on it.
"""
if ax is None:
ax = plt.gca()
for lc in self:
ax.errorbar(lc.bjd_time[~lc.mask], lc.flux[~lc.mask],
lc.fluxerr[~lc.mask], **kwargs)
return ax