Import Packages

In [2]:
import sys
import time
import json
import warnings
import numpy as np
import scipy as sp
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import norm
from mpl_toolkits import mplot3d
from urllib.request import urlopen
from scipy.optimize import minimize, basinhopping, NonlinearConstraint
from sklearn.neighbors import KernelDensity
from pandas.plotting import register_matplotlib_converters
register_matplotlib_converters()
warnings.filterwarnings('ignore')

Declare Functions

In [3]:
def yahoo_parser(url):
    response = urlopen(url)
    data = json.loads(response.read().decode("utf-8"))['chart']['result'][0]
    t = pd.to_datetime(data['timestamp'], unit='s').date
    c = data['indicators']['quote'][0]['close']
    return pd.DataFrame({'close': c}, index=t)

def default_parser(url):
    return pd.read_csv(url, index_col=0, parse_dates=True)

def retrieve(source, code, label, column=None):
    query = {
        'yahoo': 'https://query1.finance.yahoo.com/v8/finance/chart/{}?period1=0&period2=5000000000&interval=1d',
    }
    parser = {
        'yahoo': yahoo_parser,
    }
    url = query.get(source).format(code)
    df = parser.get(source, default_parser)(url)
    df.index.name = None
    df.columns = [col.lower() for col in df.columns] if df.shape[1] > 1 else [label]
    if not column: column = df.columns
    all_dates = pd.date_range(df.index.min(), df.index.max(), freq='D')
    df = df.replace('.', None).reindex(all_dates).fillna(method='ffill')
    return df.astype('float')[column].squeeze()

def sigma_func_linear(x, a, b):
    return a * x + b

def sigma_func_quad(x, a, b, c):
    return a * x**2 + b * x + c

def sigma_func_quad_diff(x, a, b, c, a2):
    return a * x**2 * (x >= 0) + a2 * x**2 * (x < 0) + b * x + c

def SVIvol(params, x):
    a, b, m, rho, sigma = params
    return np.sqrt(a + b * (rho * (x - m) + np.sqrt((x - m)**2 + sigma**2)))

def SVIfail(params, T):
    a, b, m, rho, sigma = params
    return (abs(rho)>1) | (b<0) | (sigma<0) | (a+b*np.sqrt(1-rho**2)*sigma<0) | (b*(1+abs(rho)) > np.minimum(2/T,2))

def calc_call_premium(K, T, sigma):
    sigma = np.where(sigma>0, sigma, 1e-4)
    d1 = (np.log(1/K) + sigma**2 / 2 * T) / (sigma * np.sqrt(T))
    d2 = (np.log(1/K) - sigma**2 / 2 * T) / (sigma * np.sqrt(T))
    c = norm.cdf(d1) - K * norm.cdf(d2)
    return c
In [4]:
def single_mle_resample(df, term_day, sample_size, max_trial, equidist=True, verbose=0):
    """
    Given a term_day, using different samples to solve for the optimal parameters, return all the results.
    
    :param df: pd.DataFrame, return of SPX
    :param term_day: int
    :param sample_size: int, how many points will each sample contain
    :param max_trial: how many different samples will be generated
    :param equidist: bool, whether a sample contains equidistant points
    :param verbose: 0, 1, or 2
        if verbose = 0, do not print any results of the optimizer
        if verbose = 1, print optimizer.success
        if verbose = 2, print the whole optimizer object
    """
    col_names = ['svi_a','svi_b','svi_m','svi_rho','svi_sigma']
    result = pd.DataFrame()
    full_series = df[str(term_day)+'D_ret'].dropna()
    avg, std = full_series.mean(), full_series.std()
    for n in range(max_trial):
        print(term_day,n)
        if equidist:
            rand = np.random.uniform(0.5, 1.2)
            r_series = np.array([elem for elem in np.linspace(avg-5*rand*std, avg+5*rand*std, sample_size)])
        else:
            r_series = np.random.choice(full_series, sample_size)

        coef_svi = minimize(obj_func_log_likelihood, list(svi_initial.loc[str(term_day)+'D']), args=(r_series, term_day, 'svi', 0),
                       method='SLSQP', options={"disp": 0})
        if coef_svi.success:
            coef_svi=coef_svi.x
        else:
            print(coef_svi)
            coef_svi=list(svi_initial.loc[str(term_day)+'D'])


        print(coef_svi)
        tmp = pd.DataFrame([np.hstack([coef_svi])], columns=col_names)
        result = pd.concat([result, tmp], axis=0, ignore_index=True)
    return result



def full_mle_resample(df, terms, sample_size, max_trial, equidist=True, save_to_file=False):
    """
    Resample and compute the avg, std for each term_day
    
    :param df: pd.DataFrame, period return of SPX
    :param terms: dict, {term_str: term_day}
    :sample_size: int, how many points will each sample contain
    :param max_trial: how many different samples will be generated
    :param equidist: bool, whether a sample contains equidistant points
    :param save_to_file: whether each result will be saved
    """
    col_names = ['svi_a','svi_b','svi_m','svi_rho','svi_sigma']
    params_avg = []
    params_std = []
    num_window = len(terms)
    for term_str, term_day in terms.items():
        result = single_mle_resample(df, term_day, sample_size, max_trial,equidist)
        if save_to_file:
            result.to_csv("MLE_opt_params/200_40/"+term_str+"_params.csv")
        tmp_params = result.mean().values.tolist()
        params_avg.append(tmp_params)
        params_std.append(result.std().values.tolist())
    avg_result = pd.DataFrame(params_avg)
    std_result = pd.DataFrame(params_std)
    avg_result.columns = std_result.columns = col_names
    avg_result.index = std_result.index = terms.keys()
    return avg_result, std_result

Format Data

In [5]:
# load yahoo data
raw_df = retrieve('yahoo', '^GSPC', 'SPX').to_frame()
In [6]:
# remove all weekends
df = raw_df.copy()
df = df.reindex(pd.date_range(start="1970-01-02", end="2018-12-31", freq='B'))

# take the log price
df['log_SPX'] = np.log(df["SPX"])
# compute daily log return
df['r'] = df['log_SPX'].diff()

term_days = [5, 10] + list(range(20, 521, 20))
term_name = [str(i) + "D" for i in term_days]
terms = dict(zip(term_name, term_days))

# calculate rolling period return
for k, v in terms.items():
    df[k + '_ret'] = df["r"].rolling(v).sum()
    df[k + '_ret'] = df[k + '_ret']- np.log(np.mean(np.exp(df[k + '_ret'])))
In [35]:
# ######  Grid Search Interval    ####

# para=[]
# for a in np.linspace(-0.3,0.5,8):
#     for b in np.linspace(0,1,10):
#         for m in np.linspace(-0.5,0.2,7):
#             for rho in np.linspace(-1,0,10):
#                 for sigma in np.linspace(0.1,0.8,8):
#                     if not (abs(rho)>1) | (b<0) | (sigma<0) | (a+b*np.sqrt(1-rho**2)*sigma<0) | (b*(1+abs(rho)) > 2):
#                         para.append([a, b, m, rho, sigma])

# # svi_initial=pd.read_csv('initials.csv',index_col=0)
In [36]:
len(para)
Out[36]:
34503

Fit Volatility Surface - Least Sequare Method

In [8]:
def calc_density(x, r, T, params, delta_x=0.001, method='linear'):
    """
    Approximate the density of x = log(K) using butterfly
    :param x: float / np.array(dtype=float), log return
    :param r: float
    :param params: array-like object, 
        linear: a, b
        quad: a, b, c
        quad_diff: a, b, c, a2
        svi: a, b, m, rho, sigma
    :delta_x: float, default 0.001
    :method: str, the analytical form of volatility function, can be 'linear'/'quad'/'quad_diff'/'svi'
    
    :return: float / np.array(dtype=float), a series of density
    """
    # params: a, b, c, a2
    if method == 'linear' and len(params) == 2:
        x1, x2, a, b = x + np.log(1 + delta_x), x + np.log(1 - delta_x), params[0], params[1]
        sigma1, sigma, sigma2 = a * x1 + b,\
                                a * x + b,\
                                a * x2 + b
    elif method == 'quad' and len(params) == 3:
        x1, x2, a, b, c = x + np.log(1 + delta_x), x + np.log(1 - delta_x), params[0], params[1], params[2]
        sigma1, sigma, sigma2 = a * x1**2 + b * x1 + c,\
                                a * x**2 + b * x + c,\
                                a * x2**2 + b * x2 + c
    elif method == 'quad_diff' and len(params) == 4:
        x1, x2, a, b, c, a2 = x + np.log(1 + delta_x), x + np.log(1 - delta_x), params[0], params[1], params[2], params[3]
        sigma1, sigma, sigma2 = a * x1**2 * (x1 >= 0) + a2 * x1**2 * (x1 < 0) + b * x1 + c,\
                                a * x**2 * (x >= 0) + a2 * x**2 * (x < 0) + b * x + c,\
                                a * x2**2 * (x2 >= 0) + a2 * x2**2 * (x2 < 0) + b * x2 + c
    elif method == 'svi' and len(params) == 5:
        x1, x2 = x + np.log(1 + delta_x), x + np.log(1 - delta_x)
        sigma1, sigma, sigma2 = SVIvol(params, x1), SVIvol(params, x), SVIvol(params, x2)
    else:
        print('params: ',params,'method: ',method)
        raise Exception("Wrong param dimension or fit method!")

    f = np.exp(r * T) * (calc_call_premium(np.exp(x1), T, sigma1)
                         - 2 * calc_call_premium(np.exp(x), T, sigma)
                         + calc_call_premium(np.exp(x2), T, sigma2)) / (np.exp(x) * delta_x**2)
    return f

def obj_func_least_square(params, r_series, r_density, term_day, method):
    """
    Compute mean of squared errors (avg(SSE)) given the true density
    :param params: parameters in volatility function
    :r_series: array-like, the input in the volatility function, used to predict density
    :r_density: array-like, true density
    :term_day: int, T
    :method: str, the analytical form of volatility function
    """
    density = calc_density(r_series, r=0, T=term_day/260, params=params, delta_x=0.001, method=method)
    return np.mean((r_density - density)**2)
In [9]:
def plot_result(terms, full_model_output):
    df = full_model_output
    # plot the vol curve seperately, x_axis: K
    col_names = full_model_output.columns.tolist()
    if 'linear_a' in col_names:
        fig = plt.figure(1, figsize=(18, 4))
        plt.subplot(1, 3, 1)
        plt.scatter(terms.values(), df.iloc[:, 1].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 4].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 7].values, s=10)
        plt.xlabel('term days')
        plt.ylabel('coefficient magnitude')
        plt.legend(['linear', 'quad', 'quad_diff']);
        plt.title('intercept')
        plt.subplot(1, 3, 2)
        plt.scatter(terms.values(), df.iloc[:, 0].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 3].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 6].values, s=10)
        plt.xlabel('term days')
        plt.ylabel('coefficient magnitude')
        plt.legend(['linear', 'quad', 'quad_diff']);
        plt.title('linear coefficient (skewness)')
        plt.subplot(1, 3, 3)
        plt.scatter(terms.values(), df.iloc[:, 2].values, s=10) # 
        plt.scatter(terms.values(), df.iloc[:, 5].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 8].values, s=10)
        plt.xlabel('term days')
        plt.ylabel('coefficient magnitude')
        plt.legend(['quad', 'quad_diff_+', 'quad_diff_-']);
        plt.title('quadratic coefficient (kurtosis)')
    elif 'svi_m' in col_names:
        pass
    fig = plt.figure(2, figsize=(18, len(terms) * 6))
    index = 1
    for term_str, term_day in terms.items():
        plt.subplot(len(terms), 3, index)
        x = np.linspace(-0.8, 0.7, 41)
        K = np.exp(x)
        params = df.loc[term_str, :].values.tolist()
        if 'linear_a' in col_names:
            a, b = params[0:2]
            plt.plot(x, a*x+b)
            a, b, c = params[2:5]
            plt.plot(x, a*x**2 + b*x + c)
            a, b, c, a2 = params[5:9]
            plt.plot(x, (a*(x>=0)+a2*(x<0))*x**2+b*x+c)
            plt.legend(['linear', 'quad', 'quad diff'])
            plt.xlabel('log moneyness(K/S)')
            plt.ylabel('volatility')
            plt.title(term_str+" volatility plot against log moneyness")
        elif 'svi_m' in col_names:
            plt.plot(K, SVIvol(params, x),color='salmon')
            plt.xlabel('moneyness(K/S)')
            plt.ylabel('volatility')
            plt.title(term_str+" volatility plot against log moneyness")
        index += 1
    return

OLS Fit Results

Fit Volatility Surface - Maximum Likelihood Method

SVI Model for Volatility

$Var(x) = a + b * (\rho * (x - m) + \sqrt{(x-m)^2 + \sigma^2})$

We would like to find optimal set of (a, b, $\rho$, m, $\sigma$) that could fit well to empirical return distribution.

Constriants to ensure we do not get negative volatility and probability density:

$a+b\sigma \sqrt{1-\rho ^2}\geq 0$

$b(1+\left | \rho \right |)\leq \frac{2}{T}$

$b(1+\left | \rho \right |)\leq 2$

In [55]:
def full_model(terms, df, plot=True, method='ols'):
    """
    Fit the vol curve for all terms
    :param terms: dict, {'term_str': term_day}
    :param df: pd.DataFrame, index: dates; columns: rolling period return, columns names ["20D_ret", ...]
    :param plot: bool, whether to plot the fitted density of x=logK
    :param method: 'ols' / 'mle'
    :return result: pd.DataFrame, index: terms.keys, values, optimal parameters in volatility function
    """
    coef_linear_list = []
    coef_quad_list = []
    coef_quad_diff_list = []
    coef_svi_list = []
    cm = {'g':'#60854B', 'r':'#C75B56', 'b':'#6C8DBF'}
    initials = [(0.0, 0.4,  0.0, -0.3,  0.1)]
    if plot:
        fig = plt.figure(figsize=(18, len(terms) * 6))
        index = 1
    for term_str, term_day in terms.items():
        print(term_day)
        raw_series = df[term_str+'_ret'].dropna()
        r_series = raw_series.copy()  # fit kernel density
        raw_series = np.array(sorted(raw_series))
        if method == 'mle':
            # local
#             coef_svi = minimize(obj_func_log_likelihood, list(svi_initial.loc[term_str]), args=(raw_series, term_day, 'svi', 0),
#                            method='SLSQP', options={"disp": 0})    
#             if coef_svi.success:
#                 coef_svi=coef_svi.x
#             else:
#                 coef_svi=list(svi_initial.loc[term_str])

            # grid search
            coef_svi = grid_search(para, raw_series, term_day)

            coef_svi_list.append(coef_svi)
            svi_fit = np.round(calc_density(raw_series, r=0, T=term_day/260, params=coef_svi, delta_x=0.001,
                                            method='svi'), 4)
            if plot:
                plt.subplot(len(terms), 2, index)
                plt.hist(raw_series, bins=300, density=True, color='grey')
                plt.plot(raw_series, svi_fit, linewidth=2,color=cm['b'])
                plt.legend(['svi fit', 'empirical density'], loc='upper left')
                plt.title(term_str+" fitted density")
                index += 2
    if method == 'mle':
        result = pd.DataFrame(coef_svi_list)
        result.columns=['svi_a', 'svi_b', 'svi_m', 'svi_rho', 'svi_sigma']
        result.index = list(terms.keys())

    # plot the vol curve seperately, x_axis: K
    col_names = result.columns.tolist()
    if 'linear_a' in col_names:
        fig = plt.figure(1, figsize=(18, 4))
        plt.subplot(1, 3, 1)
        plt.scatter(terms.values(), df.iloc[:, 1].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 4].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 7].values, s=10)
        plt.xlabel('term days')
        plt.ylabel('coefficient magnitude')
        plt.legend(['linear', 'quad', 'quad_diff']);
        plt.title('intercept')
        plt.subplot(1, 3, 2)
        plt.scatter(terms.values(), df.iloc[:, 0].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 3].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 6].values, s=10)
        plt.xlabel('term days')
        plt.ylabel('coefficient magnitude')
        plt.legend(['linear', 'quad', 'quad_diff']);
        plt.title('linear coefficient (skewness)')
        plt.subplot(1, 3, 3)
        plt.scatter(terms.values(), df.iloc[:, 2].values, s=10) # 
        plt.scatter(terms.values(), df.iloc[:, 5].values, s=10)
        plt.scatter(terms.values(), df.iloc[:, 8].values, s=10)
        plt.xlabel('term days')
        plt.ylabel('coefficient magnitude')
        plt.legend(['quad', 'quad_diff_+', 'quad_diff_-']);
        plt.title('quadratic coefficient (kurtosis)')
    elif 'svi_m' in col_names:
        pass
    index = 2
    for term_str, term_day in terms.items():
        plt.subplot(len(terms), 2, index)
        x = np.linspace(-0.5, 0.5, 41)
        K = np.exp(x)
        params = result.loc[term_str, :].values.tolist()
        if 'linear_a' in col_names:
            a, b = params[0:2]
            plt.plot(x, a*x+b)
            a, b, c = params[2:5]
            plt.plot(x, a*x**2 + b*x + c)
            a, b, c, a2 = params[5:9]
            plt.plot(x, (a*(x>=0)+a2*(x<0))*x**2+b*x+c)
            plt.legend(['linear', 'quad', 'quad diff'])
            plt.xlabel('log moneyness(K/S)')
            plt.ylabel('volatility')
            plt.title(term_str+" volatility plot against log moneyness")
        elif 'svi_m' in col_names:
            plt.plot(x, SVIvol(params, x),color=cm['b'])
            plt.xlabel('log moneyness(K/S)')
            plt.ylabel('volatility')
            plt.title(term_str+" volatility plot against log moneyness")
        index += 2
    return result
In [11]:
def obj_func_log_likelihood(params, r_series, term_day, method, neg_density_penalty=0):
    x, r, T=r_series,0, term_day/260
    delta_x=0.001
    if method == 'linear' and len(params) == 2:
        x1, x2, a, b = x + np.log(1 + delta_x), x + np.log(1 - delta_x), params[0], params[1]
        sigma1, sigma, sigma2 = a * x1 + b,\
                                a * x + b,\
                                a * x2 + b
    elif method == 'quad' and len(params) == 3:
        x1, x2, a, b, c = x + np.log(1 + delta_x), x + np.log(1 - delta_x), params[0], params[1], params[2]
        sigma1, sigma, sigma2 = a * x1**2 + b * x1 + c,\
                                a * x**2 + b * x + c,\
                                a * x2**2 + b * x2 + c
    elif method == 'quad_diff' and len(params) == 4:
        x1, x2, a, b, c, a2 = x + np.log(1 + delta_x), x + np.log(1 - delta_x), params[0], params[1], params[2], params[3]
        sigma1, sigma, sigma2 = a * x1**2 * (x1 >= 0) + a2 * x1**2 * (x1 < 0) + b * x1 + c,\
                                a * x**2 * (x >= 0) + a2 * x**2 * (x < 0) + b * x + c,\
                                a * x2**2 * (x2 >= 0) + a2 * x2**2 * (x2 < 0) + b * x2 + c
    elif method == 'svi' and len(params) == 5:
        if SVIfail(params, term_day/260):
            return np.inf
        x1, x2 = x + np.log(1 + delta_x), x + np.log(1 - delta_x)
        sigma1, sigma, sigma2 = SVIvol(params, x1), SVIvol(params, x), SVIvol(params, x2)
    else:
        print('params: ',params,'method: ',method)
        raise Exception("Wrong param dimension or fit method!")

    density = np.exp(r * T) * (calc_call_premium(np.exp(x1), T, sigma1)
                         - 2 * calc_call_premium(np.exp(x), T, sigma)
                         + calc_call_premium(np.exp(x2), T, sigma2)) / (np.exp(x) * delta_x**2)
    if np.sum(density <= 0) > 0:
        density = np.where(density > 0, np.log(density), -neg_density_penalty)
        density = np.log(np.fabs(density))
    else:
        density = np.log(density)
    return -np.sum(density)

def multiple_initialization(initials, r_series, term_day, sigma_method, neg_density_penalty=0):
    min_val = np.inf
    opt_params = initials[0]
    for initial in initials:
        opt_obj = minimize(obj_func_log_likelihood, initial, args=(r_series, term_day, sigma_method, neg_density_penalty),
                           method='SLSQP', options={"disp": 0})

#     kwargs = {"method": 'SLSQP','args': (r_series, term_day, sigma_method, neg_density_penalty)} 
#     for initial in initials:
#         opt_obj = basinhopping(obj_func_log_likelihood, initial, minimizer_kwargs=kwargs,disp=True)

        if opt_obj.fun < min_val:
            min_val = opt_obj.fun
            opt_params = opt_obj.x
    return opt_params

def grid_search(para, raw_series, term_day):
    T=term_day/260
    opt_params = para[0]
    min_val=obj_func_log_likelihood(para[0], raw_series, term_day, 'svi', neg_density_penalty=0)
    for params in para:
        l=obj_func_log_likelihood(params, raw_series, term_day, 'svi', neg_density_penalty=0)
        if l < min_val:
            min_val = l
            opt_params = params
    return opt_params
In [67]:
# term_days = [5, 10] + list(range(20, 521, 20))
# term_name = [str(i) + "D" for i in term_days]
# terms = dict(zip(term_name, term_days))
In [68]:
result = full_model(terms, df, method='mle')
5
10
20
40
60
80
100
120
140
160
180
200
220
240
260
280
300
320
340
360
380
400
420
440
460
480
500
520