Source code for spd_trading.historical_density

# ---------------------------------------------------------------------------------------------------------------- GARCH
import numpy as np
from arch import arch_model
import time
import copy
import pickle
import os
import logging

from .utils.density import density_estimation


[docs]class GARCH: """The GARCH model class, which fits a model to the historical data and simulates sample paths based on that model Args: data (np.array): The timeseries that should be fitted (usually log returns) data_name (str): Name of the dataset, will be used in filename. n_fits (int): How many sliding windows the data is devided into. garch_data_folder (str): Path to folder where to save/load GARCH model. overwrite_model (bool, optional): Whether to overwrite the GARCH model. Defaults to True. window_length (int, optional): Length of each sliding window. Defaults to 365. z_h (float, optional): Bandwidth *factor* for Kernel Density Estimation. Defaults to 0.1. Attributes: parameters (np.array): The parameters :math:`\\Theta = (\\omega, \\alpha, \\beta)` of GARCH model. z_dens (np.array): The distribution of innovations :math:`\\mathcal{Z} = \\left\\{z_0, z_1, ...., z_T \\right\\}` simulated_log_returns (np.array): simulated log returns at horizon (also: maturity) :math:`\\tau` simulated_tau_mu (np.array): product of :math:`\\tau \\cdot \\mu` for each simulation """ def __init__( self, data, data_name, n_fits, garch_data_folder, overwrite_model=True, window_length=365, z_h=0.1, ): self.data = data # timeseries (here: log_returns) self.z_h = z_h self.data_name = data_name self.window_length = window_length self.n_fits = n_fits self.overwrite_model = overwrite_model self.filename_model = os.path.join( garch_data_folder, "GARCH_Model_{}_window_length-{}_n-{}".format(self.data_name, self.window_length, self.n_fits), ) # parameters that are created during run self.parameters = None self.e_process = None self.z_process = None self.sigma2_process = None self.z_dens = None self.simulated_log_returns = None self.simulated_tau_mu = None def _load(self): """Loads a GARCH model.""" with open(self.filename_model, "rb") as f: tmp_dict = pickle.load(f) f.close() self.__dict__.clear() self.__dict__.update(tmp_dict) return def _save(self): """Saves a GARCH model.""" with open(self.filename_model, "wb") as f: pickle.dump(self.__dict__, f, 2) f.close() return
[docs] def fit(self): """Fits a GARCH(1,1) model to the data. This For *n_fits* sliding windows, the parameters :math:`\\Theta = (\\omega, \\alpha, \\beta)` and the distribution of innovations :math:`\\mathcal{Z} = \\left\\{z_0, z_1, ...., z_T \\right\\}` are estimated. The results are saved in *self*. """ if os.path.exists(self.filename_model) and (self.overwrite_model == False): logging.info(f" -------------- use existing GARCH model: {self.filename_model}") self._load() return start = self.window_length + self.n_fits end = self.n_fits parameters = np.zeros((self.n_fits, 4)) z_process = [] e_process = [] sigma2_process = [] for i in range(0, self.n_fits): window = self.data[end - i : start - i] data = window - np.mean(window) model = arch_model(data, q=1, p=1) GARCH_fit = model.fit(disp="off") mu, omega, alpha, beta = [ GARCH_fit.params["mu"], GARCH_fit.params["omega"], GARCH_fit.params["alpha[1]"], GARCH_fit.params["beta[1]"], ] parameters[i, :] = [mu, omega, alpha, beta] if i == 0: sigma2_tm1 = omega / (1 - alpha - beta) else: sigma2_tm1 = sigma2_process[-1] e_t = data.tolist()[-1] # last observed log-return, mean adjust. e_tm1 = data.tolist()[-2] # previous observed log-return sigma2_t = omega + alpha * e_tm1 ** 2 + beta * sigma2_tm1 z_t = e_t / np.sqrt(sigma2_t) e_process.append(e_t) z_process.append(z_t) sigma2_process.append(sigma2_t) self.parameters = parameters self.e_process = e_process self.z_process = z_process self.sigma2_process = sigma2_process # ------------------------------------------- kernel density estimation z_dens_x = np.linspace(min(self.z_process), max(self.z_process), 500) h_dyn = self.z_h * (np.max(z_process) - np.min(z_process)) z_dens_y = density_estimation(np.array(z_process), np.array(z_dens_x), h=h_dyn).tolist() self.z_dens = {"x": z_dens_x, "y": z_dens_y} logging.info(f"------------- save GARCH model: {self.filename_model}") self._save() return
def _GARCH_simulate(self, pars, horizon): """stepwise GARCH simulation until burnin + horizon Args: pars (tuple): (mu, omega, alpha, beta) Returns: tuple of lists: simulated sigma2- and e-process """ mu, omega, alpha, beta = pars burnin = horizon * 2 sigma2 = [omega / (1 - alpha - beta)] e = [self.data.tolist()[-1] - mu] # last observed log-return mean adj. weights = self.z_dens["y"] / (np.sum(self.z_dens["y"])) for _ in range(horizon + burnin): sigma2_tp1 = omega + alpha * e[-1] ** 2 + beta * sigma2[-1] z_tp1 = np.random.choice(self.z_dens["x"], 1, p=weights)[0] e_tp1 = z_tp1 * np.sqrt(sigma2_tp1) sigma2.append(sigma2_tp1) e.append(e_tp1) return sigma2[-horizon:], e[-horizon:] def _variate_pars(self, pars, bounds): """Variation of parameters for fit of GARCH model. The GARCH fit (pars) was obtained from *n_fits* moving windows and therefore varies itself. Args: pars (np.array): *n_fits* parameters of GARCH model bounds (np.array): bounds to the parameters Returns: np.array: Slightly varied parameters. """ new_pars = [] i = 0 for par, bound in zip(pars, bounds): var = bound ** 2 / self.n_fits new_par = np.random.normal(par, var, 1)[0] if (new_par <= 0) and (i >= 1): logging.warning(f"new_par too small {new_par}") new_par = 0.01 new_pars.append(new_par) i += 1 return new_pars
[docs] def simulate_paths(self, horizon, simulations, variate_GARCH_parameters=True): """Monte Carlo Simulation - Simulate paths using the fitted GARCH(1,1) model. Args: horizon (int): How many steps of paths to simulate variate (bool, optional): Whether GARCH parameters should be variated. Defaults to True. Returns: tuple: simulated_log_returns, simulated_tau_mu """ logging.info(f" -------------- simulate paths for: {self.data_name}, {horizon}, {simulations}") if os.path.exists(self.filename_model) and (self.overwrite_model == False): logging.info(f" ----------- use existing GARCH model: {self.filename_model}") self._load() else: logging.info(" ----------- fit new GARCH model") self.fit() pars = np.mean(self.parameters, axis=0).tolist() # mean bounds = np.std(self.parameters, axis=0).tolist() # std of parameters logging.info(f"garch parameters : {pars}") np.random.seed(1) # for reproducability in _variate_pars() new_pars = copy.deepcopy(pars) # set pars for first round of simulation simulated_log_returns = np.zeros(simulations) simulated_tau_mu = np.zeros(simulations) tick = time.time() for i in range(simulations): if (i + 1) % (simulations * 0.1) == 0: logging.info(f"{i + 1}/{simulations} - runtime: {round((time.time() - tick) / 60)} min") if ((i + 1) % (simulations * 0.05) == 0) & variate_GARCH_parameters: new_pars = self._variate_pars(pars, bounds) sigma2, e = self._GARCH_simulate(new_pars, horizon) simulated_log_returns[i] = np.sum(e) simulated_tau_mu[i] = horizon * pars[0] self.simulated_log_returns = simulated_log_returns # summed because we have log-returns self.simulated_tau_mu = simulated_tau_mu return simulated_log_returns, simulated_tau_mu
# ----------------------------------------------------------------------------------------------------------- CALCULATOR import pandas as pd
[docs]class Calculator(GARCH): """The Calculator Class for the Historical Density. The Historical Density is estimated via a Monte Carlo simulation, where the sample paths are simulated by a GARCH(1,1) model. Args: data (np.array): Timeseries of the underlying. tau_day (int): Time to maturity in days, also: *horizon*. date (str): Evaluation date, the last day of timeseries. S0 (float): Price of underlying at :math:`t=0`, which is on evaluation date garch_data_folder (str): Path to folder where to save/load GARCH model. n_fits (int): How many sliding windows the data is devided into. overwrite_model (bool, optional): Whether to overwrite the GARCH model. Defaults to True. overwrite_simulations (bool, optional): Whether to overwrite the simulations. Defaults to True. target (str, optional): Column name of the timeseries. Defaults to "price". window_length (int, optional): Length of each sliding window in GARCH fit. Defaults to 365. h (float, optional): Bandwidth for Kernel Density Estimation. Defaults to 0.15. simulations (int, optional): How many paths to simulate. Defaults to 5000. Attributes: log_returns (np.array): The daily log returns of the price index timeseries. Timeseries for GARCH model. GARCH (spd_trading.historical_density.GARCH): Instance of class. ST (np.array): Simulated prices of underlying, according to GARCH model. q_M (np.array): Density in Moneyness domain M=S0/ST. """ def __init__( self, data, tau_day, date, S0, garch_data_folder, n_fits, cutoff=0.5, overwrite_model=True, overwrite_simulations=True, target="price", window_length=365, h=0.15, simulations=5000, ): self.data = data self.tau_day = tau_day self.date = date self.S0 = S0 self.garch_data_folder = garch_data_folder self.cutoff = cutoff self.overwrite_simulations = overwrite_simulations self.target = target self.simulations = simulations self.h = h # parameters that are created during run self.log_returns = self._get_log_returns() self.GARCH = GARCH( data=self.log_returns, window_length=window_length, data_name=self.date, n_fits=n_fits, overwrite_model=overwrite_model, garch_data_folder=self.garch_data_folder, z_h=0.1, ) self.ST = None self.q_M = None def _get_log_returns(self): """Calculate logarithmic 1-day returns from data timeseries.""" n = self.data.shape[0] data = self.data.reset_index() first = data.loc[: n - 2, self.target].reset_index() second = data.loc[1:, self.target].reset_index() historical_returns = (second / first)[self.target] return np.log(historical_returns) * 100 def _calculate_path(self, simulated_log_returns, simulated_tau_mu): """Calculates the underlyings' prices of maturity based on the simulations. Args: simulated_log_returns (np.array): simulated log returns at horizon (also: maturity) :math:`\\tau` simulated_tau_mu (np.array): product of :math:`\\tau \\cdot \\mu` for each simulation Returns: np.array: Simulated prices of underlying at maturity :math:`\\tau` """ S_T = self.S0 * np.exp(simulated_log_returns / 100 + simulated_tau_mu / 100) return S_T
[docs] def get_hd(self, variate_GARCH_parameters=True): """Estimates the Historical Density via a GARCH(1,1) model and Monte Carlo simulation. Args: variate_GARCH_parameters (bool, optional): Whether the GARCH parameters should be variated slightly during the Monte Carlo Simulation. Defaults to True. """ self.filename = "T-{}_{}_Ksim.csv".format(self.tau_day, self.date) if os.path.exists(os.path.join(self.garch_data_folder, self.filename)) and ( self.overwrite_simulations == False ): logging.info(f"-------------- use existing Simulations {self.filename}") self.GARCH._load() # load GARCH Model (parameters, z_dens) pass else: logging.info("-------------- create new Simulations") simulated_log_returns, simulated_tau_mu = self.GARCH.simulate_paths( self.tau_day, self.simulations, variate_GARCH_parameters ) self.ST = self._calculate_path(simulated_log_returns, simulated_tau_mu) pd.Series(self.ST).to_csv(os.path.join(self.garch_data_folder, self.filename), index=False) self.ST = pd.read_csv(os.path.join(self.garch_data_folder, self.filename)) M = np.linspace((1 - self.cutoff), (1 + self.cutoff), 100) simulated_paths_in_moneyness = np.array(self.S0 / self.ST) self.q_M = {"x": M, "y": density_estimation(simulated_paths_in_moneyness, M, h=self.h)}
from matplotlib import pyplot as plt
[docs]class Plot: """The Plotting class for Historical Density. Args: x (float, optional): The Moneyness interval for the plots. :math:`M = [1-x, 1+x]`. Defaults to 0.5. """ def __init__(self, x=0.5): self.x = x
[docs] def density(self, HD): """Visualization of the Historical Density. Args: HD (spd_trading.historical_density.Calculator): Instance of class ``spd_trading.historical_density.Calculator``. Returns: Figure: Matplotlib figure. """ fig, (ax0) = plt.subplots(1, 1, figsize=(6, 4)) # density q_m ax0.plot(HD.q_M["x"], HD.q_M["y"]) ax0.set_xlabel("Moneyness") ax0.set_ylabel("Probability") ax0.set_xlim(1 - self.x, 1 + self.x) ax0.set_ylim(0) plt.tight_layout() return fig