Select Git revision
list_genomes.lst
preprocess.py 3.39 KiB
import numpy as np
class Preprocessor:
def __init__(self, configured, average_body_length=1.0):
self.configured = configured
# usually set later
self.average_body_length = average_body_length
@property
def config(self):
return self.configured.config
@property
def swap_head_tail(self):
return self.config.get('swap_head_tail', True)
@swap_head_tail.setter
def swap_head_tail(self, b):
self.config['swap_head_tail'] = b
def window(self, t, data):
interpolation_args = {k: self.config[k]
for k in ('spine_interpolation', 'frame_interval')
if k in self.config}
winlen = self.config["len_traj"]
N = data.shape[0]+1
if interpolation_args:
for m in range(0, N-1):
win = interpolate(t, data, m, winlen, **interpolation_args)
if win is not None:
assert win.shape[0] == winlen
yield t[m], win
else:
for m in range(0, N-winlen):
n = m + winlen
yield t[(m + n) // 2], data[m:n]
def pad(self, target_t, defined_t, data):
if data.shape[0] == 1:
return np.repeat(data, len(target_t), axis=0)
else:
head = searchsortedfirst(target_t, defined_t[0])
tail = len(target_t) - (searchsortedlast(target_t, defined_t[-1]) + 1)
ind = np.r_[
np.zeros(head, dtype=int),
np.arange(data.shape[0]),
(data.shape[1]-1) * np.ones(tail, dtype=int),
]
if len(ind) != len(target_t):
raise RuntimeError('missing time steps')
return data[ind]
def body_length(self, data):
dx = np.diff(data[:,0::2], axis=1)
dy = np.diff(data[:,1::2], axis=1)
return np.sum(np.sqrt(dx*dx + dy*dy), axis=1)
def normalize(self, w):
# center coordinates
wc = np.mean(w[:,4:6], axis=0, keepdims=True)
w = w - np.tile(wc, (1, 5))
# rotate
v = np.mean(w[:,8:10] - w[:,0:2], axis=0)
vnorm = np.sqrt(np.dot(v, v))
if vnorm == 0:
logging.warning('null distance between head and tail')
else:
v = v / vnorm
c, s = v / self.average_body_length # scale using the rotation matrix
rot = np.array([[ c, s],
[-s, c]]) # clockwise rotation
w = np.einsum("ij,jkl", rot, np.reshape(w.T, (2, 5, -1), order='F'))
return w
"""
Preprocess a single track.
This includes running a sliding window, resampling the track in each window,
normalizing the spines, etc.
"""
def preprocess(self, t, data):
defined_t = []
ws = []
for t_, w in self.window(t, data):
defined_t.append(t_)
ws.append(self.normalize(w))
if ws:
ret = self.pad(t, defined_t, np.stack(ws))
if self.swap_head_tail:
ret = ret[:,:,::-1,:]
return ret
def __callable__(self, *args):
return self.proprocess(*args)
# Julia functions
def searchsortedfirst(xs, x):
for i, x_ in enumerate(xs):
if x <= x_:
return i
def searchsortedlast(xs, x):
for i in range(len(xs))[::-1]:
x_ = xs[i]
if x_ <= x:
return i