I have experimented a bit using pygmo for financial portfolio optimization in the context of risk and return measured relative to a benchmark.
With regards to measuring risk, many methods can be used. The usual is minimizing the variance of the portfolio, but I have a preference for minimizing tail risk.
The "hello world" example is to find the portfolio that minimizes risk relative to the benchmark, given that weights are constraint to the [0, 1] range and must sum to 1. It is easy to realize that the solution is a portfolio with weights equal to the benchmark weights.
I have implemented a class I thought should be capable of solving the problem for both of the above mentioned methods for defining risk. However, only the "minimize variance" gives an appropriate result. Minimizing tail risk seems not to change x_init values that I start the optimization from at all.
I suspect, that a problem might be that the objective function contains a call to np.argsort. I have previously had success implementing this with Matlabs fmincon, so I know there should be a way through. But I am unsure how I should attack the problem using pygmo. What would be the right algorithm the use, what would be appropriate parameters for the chosen algo, etc. I have found inspiration in tutorials.
Below you'll find a self containing piece of code that replicates my problem. First call to the utility function "run_problem" uses the "minimize variance (cov)" objective. The resulting portfolio equals the benchmark as desired. Second call to the function "run_problem" seems to not alter the initial value x at all.
If any one has any idea to how I can get around my problem, I would be very gratefull!
import pygmo as pg
import numpy as np
import time
import matplotlib.pyplot as plt
from numba import jit, float64, int32
class optimize_risk_return:
def __init__(self, cov, market, weight_bm, objective='cov'):
self.cov = cov
self.market = market
self.weight_bm = weight_bm
self.n = len(self.weight_bm)
self.objective = objective
assert objective in ['cov','es'], 'Input "objective" can only be "cov" or "es".'
def get_nic(self):
return 0
def get_nec(self):
return 1
def get_bounds(self):
return ([0]*self.n,[1]*self.n)
def fitness(self, x):
if self.objective == 'cov':
return optimize_risk_return._fitness_std_cov(x,self.cov,self.weight_bm)
else:
return optimize_risk_return._fitness_es_market(x,self.market,self.weight_bm,1000)
@jit(float64[:](float64[:],float64[:,:],float64[:]),nopython=True)
def _fitness_std_cov(x,cov,bm_w):
ret_val = np.zeros((2,),dtype=float64)
w = x-bm_w
ret_val[0] = w.T@cov@w*1000
ret_val[1] = np.sum(x)-1
return ret_val
@jit(float64[:](float64[:],float64[:,:],float64[:], int32),nopython=True) # , parallel=True
def _fitness_es_market(x,market,bm_w,n_fractile):
ret_val = np.zeros((2,),dtype=float64)
w = x-bm_w
psi = market@w
indx = np.argsort(psi)
ret_val[0] = -1*np.mean(psi[indx[:n_fractile]])
ret_val[1] = np.sum(x)-1
return ret_val
def gradient(self, x):
if self.objective == 'cov':
return optimize_risk_return._gradient_std_cov(x,self.cov,self.weight_bm)
else:
return optimize_risk_return._gradient_es_market(x,self.market,self.weight_bm,1000)
@jit(float64[:](float64[:],float64[:,:],float64[:]),nopython=True) # , parallel=True
def _gradient_std_cov(x,cov,bm_w):
w = x-bm_w
return np.concatenate((w.T@cov*1000,np.ones((len(w),),dtype=float64)))
@jit(float64[:](float64[:],float64[:,:],float64[:],int32),nopython=True) # , parallel=True
def _gradient_es_market(x,market,bm_w, n_fractile):
w = x-bm_w
psi = market@w
indx = np.argsort(psi)
tmp = -1*market[indx[:n_fractile],:]*w
ret_val = np.empty(tmp.shape[1])
for i in range(len(ret_val)):
ret_val[i] = np.mean(tmp[:, i])
return np.concatenate((ret_val,np.ones((len(w),),dtype=float64)))
weight_bm = np.array([0.11582448, 0.35305939, 0.34733299, 0.10375922, 0.08002392])
cov_mat = np.array([[2.87275736e-04, 6.72493473e-05, 1.68465649e-04, 4.18551925e-04, 1.19171347e-04],
[6.72493473e-05, 3.20281710e-05, 5.42226697e-05, 1.17381173e-04, 4.30776541e-05],
[1.68465649e-04, 5.42226697e-05, 2.01671669e-04, 2.66489778e-04, 9.81955361e-05],
[4.18551925e-04, 1.17381173e-04, 2.66489778e-04, 1.08587282e-03, 3.10847907e-04],
[1.19171347e-04, 4.30776541e-05, 9.81955361e-05, 3.10847907e-04, 1.72575569e-04]])
market = np.random.multivariate_normal(np.zeros((5,)),cov_mat,20000)
def run_problem(opt_obj):
prob = pg.problem(opt_obj)
print(prob)
uda = pg.nlopt('auglag')
uda.ftol_rel = 1e-12
algo = pg.algorithm(uda = uda)
algo.extract(pg.nlopt).local_optimizer = pg.nlopt('var2')
algo.extract(pg.nlopt).local_optimizer.ftol_rel = 1e-20
algo.set_verbosity(100) # in this case this correspond to logs each 200 objevals
n = 100
pop = pg.population(prob = prob, size = n)
pop.problem.c_tol = [1E-12]
for i in range(n):
pop.set_x(i,np.ones((5,))/5)
start_time = time.time()
pop = algo.evolve(pop)
print(time.time()-start_time)
print('Fevals: {0}'.format(pop.problem.get_fevals()) )
print('Gevals: {0}'.format(pop.problem.get_gevals()) )
best_x = pop.get_x()[pop.best_idx()]
print('Sum of weights (should be 1) = {0}'.format(np.sum(best_x)))
plt.scatter(best_x,opt_obj.weight_bm,s=50)
plt.scatter(opt_obj.weight_bm,opt_obj.weight_bm,s=25)
plt.legend(['Actual Result','Desired Result'])
opt_obj = optimize_risk_return(cov_mat, market,weight_bm,'cov')
run_problem(opt_obj)
opt_obj = optimize_risk_return(cov_mat, market,weight_bm,'es')
run_problem(opt_obj)