"""Package teneva, module anova: ANOVA decomposition in the TT-format.
This module contains the function "anova" which computes the TT-approximation
for the tensor, using given random samples.
"""
import numpy as np
import pickle
import teneva
class ANOVA:
# TODO: add docstring and add into demo
def __init__(self, I_trn=None, y_trn=None, order=1, seed=None, fpath=None):
self.rand = teneva._rand(seed)
if not order in [1, 2]:
raise ValueError('Invalid value for ANOVA order (should be 1 or 2')
self.order = order
if fpath is None:
if I_trn is None or y_trn is None:
raise ValueError('"(I_trn, y_trn)" or "fpath" should be set')
self.build(I_trn, y_trn)
else:
if I_trn is not None or y_trn is not None:
raise ValueError('Can`t use both "(I_trn, y_trn)" and "fpath"')
self.load(fpath)
def __call__(self, I):
I = np.asanyarray(I, dtype=self.dtype)
if len(I.shape) == 1:
return self.calc(I)
elif len(I.shape) == 2:
return np.array([self.calc(i) for i in I])
else:
raise ValueError('Invalid input multi-indices for ANOVA')
def __getitem__(self, I):
return self(I)
@property
def f1_arr(self):
try:
return self._f1_arr
except AttributeError:
pass
f1_arr = self._f1_arr = [np.array([f1_curr[x] for x in dm])
for dm, f1_curr in zip(self.domain, self.f1)]
return f1_arr
@property
def f2_arr(self):
try:
return self._f2_arr
except AttributeError:
pass
f2_arr = self._f2_arr = [
np.array([f2_curr[x1, x2] for x1 in dm1 for x2 in dm2 ])
for (dm1, dm2), f2_curr in zip([(dm1, dm2)
for k1, dm1 in enumerate(self.domain[:-1])
for dm2 in self.domain[k1+1:]], self.f2)]
return f2_arr
def build(self, I_trn, y_trn):
I_trn = np.asanyarray(I_trn)
self.dtype = I_trn.dtype
y_trn = np.asanyarray(y_trn, dtype=float)
self.y_max = np.max(y_trn)
self.y_min = np.min(y_trn)
self.d = I_trn.shape[1]
self.domain = []
self.shapes = np.zeros(self.d, dtype=int)
for k in range(self.d):
points = np.unique(I_trn[:, k])
self.domain.append(points)
self.shapes[k] = len(points)
self.build_0(I_trn, y_trn)
if self.order >= 1:
self.build_1(I_trn, y_trn)
else:
self.f1 = []
if self.order >= 2:
self.build_2(I_trn, y_trn)
else:
self.f2 = []
def build_0(self, I_trn, y_trn):
self.f0 = np.mean(y_trn)
def build_1(self, I_trn, y_trn):
self.f1 = []
for k, dm in enumerate(self.domain):
f1_curr = {}
for x in dm:
idx = I_trn[:, k] == x
value = np.mean(y_trn[idx]) - self.f0
f1_curr[x] = value
self.f1.append(f1_curr)
def build_2(self, I_trn, y_trn):
cache = dict()
self.f2 = []
for k1, dm1 in enumerate(self.domain[:-1]):
for k2, dm2 in enumerate(self.domain[k1+1:], start=k1+1):
f2_curr = {}
for x1 in dm1:
for x2 in dm2:
try:
idx1 = cache[k1, x1]
except KeyError:
idx1 = I_trn[:, k1] == x1
cache[k1, x1] = idx1
try:
idx2 = cache[k2, x2]
except KeyError:
idx2 = I_trn[:, k2] == x2
cache[k2, x2] = idx2
idx = idx1 & idx2
if idx.sum() == 0:
value = 0.
else:
value = np.mean(y_trn[idx]) - self.f0
value = value - self.f1[k1][x1] - self.f1[k2][x2]
f2_curr[x1, x2] = value
self.f2.append(f2_curr)
def calc(self, i):
res = self.calc_0()
if self.order >= 1:
res += self.calc_1(i)
if self.order >= 2:
res += self.calc_2(i)
return res
def calc_0(self):
return self.f0
def calc_1(self, x):
res = 0.
for num, x1 in enumerate(x):
res += self.f1[num][x1]
return res
def calc_2(self, x):
# We allow x to be of smaller length
res = 0.
for i1, x1 in enumerate(x[:-1]):
for i2, x2 in enumerate(x[i1+1:], start=i1+1):
num = self.pair_num_to_num(i1, i2)
res += self.f2[num][x1, x2]
return res
def cores(self, r=2, noise=1.E-10, only_near=False, rel_noise=None):
if self.order < 1:
raise ValueError('TT-cores may be constructed only if order >= 1')
if rel_noise is not None:
noise = rel_noise * max(abs(self.y_max), abs(self.y_min))
cores = self.cores_1(r, noise)
if self.order >= 2:
cores2_many = self.cores_2(r, only_near)
cores = teneva.add_many([cores] + cores2_many, r=r)
return cores
def cores_1(self, r=2, noise=1.E-10):
cores = []
core = noise * self.rand.normal(size=(1, self.shapes[0], r))
core[0, :, 0] = 1.
core[0, :, 1] = self.f1_arr[0]
cores.append(core)
for i in range(1, self.d-1):
core = noise * self.rand.normal(size=(r, self.shapes[i], r))
core[0, :, 0] = 1.
core[1, :, 1] = 1.
core[0, :, 1] = self.f1_arr[i]
cores.append(core)
core = noise * self.rand.normal(size=(r, self.shapes[self.d-1], 1))
core[0, :, 0] = self.f1_arr[self.d-1] + self.f0
core[1, :, 0] = 1.
cores.append(core)
return cores
def cores_2(self, r=2, only_near=False):
mats = []
num = 0
for i1 in range(self.d-1):
for i2 in ([i1+1] if only_near else range(i1+1, self.d)):
shape = (self.shapes[i1], self.shapes[i2])
mat = self.f2_arr[num].reshape(shape, order='C')
mats.append(mat)
num += 1
cores = []
num = 0
for i1 in range(self.d-1):
for i2 in ([i1+1] if only_near else range(i1+1, self.d)):
cores.append(_second_order_2_tt(mats[num], i1, i2, self.shapes))
num += 1
return cores
def load(self, fpath):
if not '.' in fpath:
fpath += '.pickle'
with open(fpath, 'rb') as f:
data = pickle.load(f)
self.order = data['order']
self.dtype = data['dtype']
self.y_max = data['y_max']
self.y_min = data['y_min']
self.domain = data['domain']
self.shapes = data['shapes']
self.d = data['d']
self.f0 = data['f0']
self.f1 = data['f1']
self.f2 = data['f2']
def max(self, minmax=max):
"""Get min or max value of tensor from 1th order ANOVA.
Args:
minmax (function): what to find (min or max).
Returns:
(value, np.ndarray): value of the found optimum and multi-index.
"""
max_np = {min: np.argmin, max: np.argmax}[minmax]
val = self.f0
ind = [None] * self.d
for i, fi in enumerate(self.f1):
xx = list(fi)
xx_max = xx[max_np([fi[x] for x in xx])]
ind[i] = xx_max
val += fi[xx_max]
return val, ind
def pair_num_to_num(self, x1, x2):
assert x1 != x2
if x1 > x2:
x1, x2 = x2, x1
return x2 - 1 + ((-3 + 2*self.d - x1)*x1) // 2
def sample(self, xi=None, eps=1.E-10, with_square=False):
if xi is None:
xi = self.d - 1
if xi > 0:
prev_vals = self.sample(xi-1, eps, with_square)
else:
prev_vals = []
dm = self.domain[xi]
p = np.full(len(dm), self(prev_vals))
if self.order >= 1:
f1 = self.f1[xi]
for i, x in enumerate(dm):
p[i] += f1[x]
if self.order >= 2:
for x_prev_num, x_prev_val in enumerate(prev_vals):
f2 = self.f2[self.pair_num_to_num(x_prev_num, xi)]
for i, x in enumerate(dm):
p[i] += f2[x_prev_val, x]
p = p**2 if with_square else np.maximum(p, 0)
if p.sum() < eps * len(p):
print('Warning (ANOVA): probabilities are zeros') # TODO
p += eps
p /= p.sum()
p_sample = self.rand.choice(len(p), p=p)
cur_sample = dm[p_sample]
prev_vals.append(cur_sample)
return prev_vals
def save(self, fpath):
if not '.' in fpath:
fpath += '.pickle'
with open(fpath, 'wb') as f:
pickle.dump({
'order': self.order,
'dtype': self.dtype,
'y_max': self.y_max,
'y_min': self.y_min,
'domain': self.domain,
'shapes': self.shapes,
'd': self.d,
'f0': self.f0,
'f1': self.f1,
'f2': self.f2
}, f, protocol=pickle.HIGHEST_PROTOCOL)
[docs]def anova(I_trn, y_trn, r=2, order=1, noise=1.E-10, seed=None, fpath=None):
"""Build TT-tensor by TT-ANOVA from the given random tensor samples.
Args:
I_trn (np.ndarray): multi-indices for the tensor in the form of array
of the shape [samples, d].
y_trn (np.ndarray): values of the tensor for multi-indices I in the
form of array of the shape [samples].
r (int): rank of the constructed TT-tensor.
order (int): order of the ANOVA decomposition (may be only 1 or 2).
noise (float): noise added to formally zero elements of TT-cores.
seed (int): random seed. It should be an integer number or a numpy
Generator class instance.
fpath (str): optional path for train data (I_trn, y_trn).
Returns:
list: TT-tensor, which represents the TT-approximation for the tensor.
Note:
A class "ANOVA" that represents a wider set of methods for working with
this decomposition is also available. See "anova.py" for more details
(detailed documentation for this class will be prepared later). This
function is just a wrapper for "ANOVA" class. Maybe later this class
will be replaced by the function.
"""
return ANOVA(I_trn, y_trn, order, seed, fpath).cores(r, noise)
def _core_one(n, r):
return np.kron(np.ones([1, n, 1]), np.eye(r)[:, None, :])
def _second_order_2_tt(A, i, j, shapes):
if i > j:
j, i = i, j
A = A.T
U, V = teneva.matrix_skeleton(A)
r = U.shape[1]
core1 = U.reshape(1, U.shape[0], r)
core2 = V.reshape(r, V.shape[1], 1)
cores = []
for num, n in enumerate(shapes):
if num < i or num > j:
cores.append(np.ones([1, n, 1]))
if num == i:
cores.append(core1)
if num == j:
cores.append(core2)
if i < num < j:
cores.append(_core_one(n, r))
return cores