Skip to content
Snippets Groups Projects
Select Git revision
  • cd57c410c53e3ea2ce6ed8c21fdef800070b2bc9
  • master default protected
  • dev
  • install
  • new_master
  • protein_ortho
  • documentation
  • pr18
  • dev-licence
  • docker
  • prodigal_train
  • containers
  • module_all
  • functional_tests
  • opti
  • helpers
  • v1.4.1
  • v1.4.0
  • v1.3.1
  • v1.3.0
  • v1.2.0
  • v1.1.0
  • v1.0.1
  • v1.0
24 results

list_genomes.lst

Blame
  • preprocess.py 3.39 KiB
    import numpy as np
    
    
    class Preprocessor:
        def __init__(self, configured, average_body_length=1.0):
            self.configured = configured
            # usually set later
            self.average_body_length = average_body_length
    
        @property
        def config(self):
            return self.configured.config
    
        @property
        def swap_head_tail(self):
            return self.config.get('swap_head_tail', True)
    
        @swap_head_tail.setter
        def swap_head_tail(self, b):
            self.config['swap_head_tail'] = b
    
        def window(self, t, data):
            interpolation_args = {k: self.config[k]
                                  for k in ('spine_interpolation', 'frame_interval')
                                  if k in self.config}
            winlen = self.config["len_traj"]
            N = data.shape[0]+1
            if interpolation_args:
                for m in range(0, N-1):
                    win = interpolate(t, data, m, winlen, **interpolation_args)
                    if win is not None:
                        assert win.shape[0] == winlen
                        yield t[m], win
            else:
                for m in range(0, N-winlen):
                    n = m + winlen
                    yield t[(m + n) // 2], data[m:n]
    
        def pad(self, target_t, defined_t, data):
            if data.shape[0] == 1:
                return np.repeat(data, len(target_t), axis=0)
            else:
                head = searchsortedfirst(target_t, defined_t[0])
                tail = len(target_t) - (searchsortedlast(target_t, defined_t[-1]) + 1)
                ind = np.r_[
                        np.zeros(head, dtype=int),
                        np.arange(data.shape[0]),
                        (data.shape[1]-1) * np.ones(tail, dtype=int),
                        ]
                if len(ind) != len(target_t):
                    raise RuntimeError('missing time steps')
                return data[ind]
    
        def body_length(self, data):
            dx = np.diff(data[:,0::2], axis=1)
            dy = np.diff(data[:,1::2], axis=1)
            return np.sum(np.sqrt(dx*dx + dy*dy), axis=1)
    
        def normalize(self, w):
            # center coordinates
            wc = np.mean(w[:,4:6], axis=0, keepdims=True)
            w = w - np.tile(wc, (1, 5))
            # rotate
            v = np.mean(w[:,8:10] - w[:,0:2], axis=0)
            vnorm = np.sqrt(np.dot(v, v))
            if vnorm == 0:
                logging.warning('null distance between head and tail')
            else:
                v = v / vnorm
            c, s = v / self.average_body_length # scale using the rotation matrix
            rot = np.array([[ c, s],
                            [-s, c]]) # clockwise rotation
            w = np.einsum("ij,jkl", rot, np.reshape(w.T, (2, 5, -1), order='F'))
            return w
    
        """
        Preprocess a single track.
    
        This includes running a sliding window, resampling the track in each window,
        normalizing the spines, etc.
        """
        def preprocess(self, t, data):
            defined_t = []
            ws = []
            for t_, w in self.window(t, data):
                defined_t.append(t_)
                ws.append(self.normalize(w))
            if ws:
                ret = self.pad(t, defined_t, np.stack(ws))
                if self.swap_head_tail:
                    ret = ret[:,:,::-1,:]
                return ret
    
        def __callable__(self, *args):
            return self.proprocess(*args)
    
    
    # Julia functions
    def searchsortedfirst(xs, x):
        for i, x_ in enumerate(xs):
            if x <= x_:
                return i
    
    def searchsortedlast(xs, x):
        for i in range(len(xs))[::-1]:
            x_ = xs[i]
            if x_ <= x:
                return i