diff --git a/src/maggotuba/models/denselayer.py b/src/maggotuba/models/denselayer.py
index 8e89c50810a9fb061da829c24e3d8716db547899..cf76118795842ab85a36e9b13f84b0b4c884eee6 100644
--- a/src/maggotuba/models/denselayer.py
+++ b/src/maggotuba/models/denselayer.py
@@ -1,53 +1,9 @@
-import os
-import json
-import pathlib
import numpy as np
import torch
import torch.nn as nn
-from behavior_model.models.neural_nets import Encoder, device
-import behavior_model.data.utils as data_utils
-
-class DeepLinear(nn.Module):
- def __init__(self, n_input, n_output, n_layers=1):
- super().__init__()
- if n_layers is None: n_layers = 1
- self.layers = []
- layers = []
- for _ in range(n_layers - 1):
- layer = nn.Linear(n_input, n_input)
- self.layers.append(layer)
- layers.append(layer)
- layers.append(nn.ReLU())
- layer = nn.Linear(n_input, n_output)
- self.layers.append(layer)
- layers.append(layer)
- self.classifier = nn.Sequential(*layers)
-
- def _init_layers(self):
- for layer in self.layers:
- nn.init.xavier_uniform_(layer.weight)
- nn.init.zeros_(layer.bias)
-
- def forward(self, x):
- return self.classifier.forward(x)
-
-class SupervisedMaggot(nn.Module):
- def __init__(self, n_latent_features, n_behaviors, enc_config, enc_path,
- clf_path=None, n_layers=1):
- super().__init__()
- # Pretrained or trained MaggotUBA encoder
- self.encoder = encoder = Encoder(**enc_config)
- encoder.load_state_dict(torch.load(enc_path))
- # Classifier stacked atop the encoder
- self.clf = DeepLinear(n_latent_features, n_behaviors, n_layers)
- if clf_path:
- self.clf.load_state_dict(torch.load(clf_path))
- else:
- self.clf._init_layers()
-
- def forward(self, x):
- #x = torch.flip(x, (2,))
- return self.clf(self.encoder(x))
+from behavior_model.models.neural_nets import device
+#import behavior_model.data.utils as data_utils
+from maggotuba.models.modules import SupervisedMaggot
"""
This model borrows the pre-trained MaggotUBA encoder, substitute a dense layer
@@ -62,102 +18,29 @@ Training the model instead relies on the readily-preprocessed data of a
*larva_dataset hdf5* file.
"""
class DenseLayer:
- def __init__(self,
- config=None,
- autoencoder_config=None,
- n_behaviors=None,
- n_layers=1,
- average_body_length=None,
- device=device):
- # MaggotUBA autoencoder config
- self._config = autoencoder_config
- self._clf_config = config
- self.prepend_log_dir = True
- self._n_behaviors = n_behaviors
- self._n_layers = n_layers
- self.average_body_length = average_body_length
+ def __init__(self, cfgfilepath, behaviors=[], n_layers=1,
+ average_body_length=None, device=device):
+ self.model = SupervisedMaggot(cfgfilepath, behaviors, n_layers)
+ self.average_body_length = average_body_length # usually set later
self.device = device
- """
- dict: JSON-deserialized parameters for the pretrained autoencoder.
- Warning: not all keys are properly adjusted after the original file
- is repurposed for `SupervisedMaggot`.
- """
@property
def config(self):
- if self._config is None:
- self._config = self.clf_config.get("autoencoder_config", None)
- if isinstance(self._config, (str, pathlib.Path)):
- path = self._config
- with open(path, "r") as f:
- self._config = json.load(f)
- self._config["config"] = str(path)
- return self._config
-
- @config.setter
- def config(self, cfg):
- self._config = cfg
+ return self.model.encoder.config
@property
def clf_config(self):
- if self._clf_config is None:
- self._clf_config = {}
- elif isinstance(self._clf_config, (str, pathlib.Path)):
- with open(self._clf_config, "r") as f:
- self._clf_config = json.load(f)
- return self._clf_config
-
- @clf_config.setter
- def clf_config(self, cfg):
- self._clf_config = cfg
+ return self.model.clf.config
@property
- def enc_path(self):
- try:
- enc_path = self.clf_config["enc_path"]
- except KeyError:
- enc_path = "retrained_encoder.pt"
- if self.prepend_log_dir:
- enc_path = os.path.join(self.config["log_dir"], enc_path)
- return enc_path
+ def labels(self):
+ return self.model.clf.behavior_labels
- @enc_path.setter
- def enc_path(self, p):
- self.clf_config["enc_path"] = p
-
- @property
- def clf_path(self):
- try:
- clf_path = self.clf_config["clf_path"]
- except KeyError:
- clf_path = "trained_classifier.pt"
- if self.prepend_log_dir:
- clf_path = os.path.join(self.config["log_dir"], clf_path)
- return clf_path
-
- @clf_path.setter
- def clf_path(self, p):
- self.clf_config["clf_path"] = p
-
- @property
- def n_behaviors(self):
- return self.clf_config.get("n_behaviors", self._n_behaviors)
-
- @n_behaviors.setter
- def n_behaviors(self, n):
- self.clf_config["n_behaviors"] = n
-
- @property
- def n_layers(self):
- try:
- return self.clf_config["clf_depth"] + 1
- except KeyError:
- return self._n_layers
-
- @n_behaviors.setter
- def n_layers(self, n):
- self.clf_config["clf_depth"] = 0 if n is None else n - 1
+ @labels.setter
+ def labels(self, labels):
+ self.model.clf.behavior_labels = labels
+ ### TODO: move parts of the below code in a features module
def window(self, data):
winlen = self.config["len_traj"]
N = data.shape[0]+1
@@ -220,6 +103,7 @@ class DenseLayer:
ws.append(w)
if ws:
return self.pad(np.stack(ws))[:,:,::-1,:] # swap head and tail
+ ###
def forward(self, x, train=False):
if train:
@@ -255,21 +139,7 @@ class DenseLayer:
def train(self, dataset):
self.prepare_dataset(dataset)
- #
- enc_path = "best_validated_encoder.pt"
- if self.prepend_log_dir:
- enc_path = os.path.join(self.config["log_dir"], enc_path)
- if isinstance(dataset.labels[0], str):
- self.labels = dataset.labels
- else:
- self.labels = [s.decode() for s in dataset.labels]
- self.model = model = SupervisedMaggot(
- n_latent_features=self.config["dim_latent"],
- n_behaviors=self.n_behaviors,
- enc_config=self.config,
- enc_path=enc_path,
- n_layers=self.n_layers,
- )
+ model = self.model
model.train() # this only sets the model in training mode (enables gradients)
model.to(self.device)
criterion = nn.CrossEntropyLoss()
@@ -311,14 +181,7 @@ class DenseLayer:
@torch.no_grad()
def predict(self, data, subset=None):
- self.model = model = SupervisedMaggot(
- n_latent_features=self.config["dim_latent"],
- n_behaviors=self.n_behaviors,
- enc_config=self.config,
- enc_path=self.enc_path,
- clf_path=self.clf_path,
- n_layers=self.n_layers,
- )
+ model = self.model
model.eval()
model.to(self.device)
if subset is None:
@@ -327,10 +190,6 @@ class DenseLayer:
return
output = self.forward(data)
label_ids = np.argmax(output, axis=1)
- try:
- self.labels
- except AttributeError:
- self.labels = self.clf_config["behavior_labels"]
labels = [self.labels[label] for label in label_ids]
return labels
else:
@@ -346,27 +205,9 @@ class DenseLayer:
expected.append(exp)
return np.concatenate(predicted), np.concatenate(expected)
- def save(self, config_path="clf_config.json", config_only=False):
- if self.prepend_log_dir:
- config_path = os.path.join(self.config["log_dir"], config_path)
- if not config_only:
- torch.save(self.model.encoder.state_dict(), self.enc_path)
- torch.save(self.model.clf.state_dict(), self.clf_path)
- with open(config_path, "w") as f:
- json.dump(dict(
- autoencoder_config=self.config["config"],
- enc_path=self.enc_path,
- clf_path=self.clf_path,
- n_behaviors=self.n_behaviors,
- behavior_labels=self.labels,
- clf_depth=self.n_layers - 1,
- # additional information (not reused):
- bias=True,
- init="xavier",
- loss="cross-entropy",
- optimizer="adam",
- target=["present"],
- ), f, indent=2)
+ def save(self):
+ self.model.save()
+
+def new_generator(seed=0b11010111001001101001110):
+ return torch.Generator(device).manual_seed(seed)
-def new_generator():
- return torch.Generator(device).manual_seed(42)
diff --git a/src/maggotuba/models/modules.py b/src/maggotuba/models/modules.py
new file mode 100644
index 0000000000000000000000000000000000000000..8ecfab77c40c894bd3c49ff0a9b394df5e956b4c
--- /dev/null
+++ b/src/maggotuba/models/modules.py
@@ -0,0 +1,256 @@
+from pathlib import Path
+import torch
+from torch import nn
+import json
+from behavior_model.models.neural_nets import Encoder
+
+class MaggotModule(nn.Module):
+ def __init__(self, path, cfgfile=None, ptfile=None):
+ super().__init__()
+ self.path = path if isinstance(path, Path) else Path(path)
+ if cfgfile is None:
+ cfgfile = self.path.name
+ self.path = self.path.parent
+ elif not self.path.is_dir():
+ raise ValueError("\"str(self.path)\" is not a directory")
+ self.cfgfile = cfgfile
+ if ptfile is not None and Path(ptfile).parent == path:
+ ptfile = Path(ptfile).name
+ self.ptfile = ptfile
+ self._config = None
+ self._model = None
+
+ @classmethod
+ def load_config(cls, path):
+ with open(path, "r") as f:
+ return json.load(f)
+
+ @property
+ def cfgfilepath(self):
+ return self.path / self.cfgfile
+
+ @property
+ def ptfilepath(self):
+ return self.path / self.ptfile
+
+ @property
+ def config(self):
+ if self._config is None:
+ self._config = self.load_config(self.cfgfilepath)
+ return self._config
+
+ @config.setter
+ def config(self, cfg):
+ self._config = cfg
+
+ @classmethod
+ def load_model(cls, config, path):
+ raise NotImplementedError
+
+ @property
+ def model(self):
+ if self._model is None:
+ self._model = self.load_model(self.config, self.ptfilepath)
+ return self._model
+
+ @model.setter
+ def model(self, model):
+ self._model = model
+
+ def forward(self, x):
+ return self.model(x)
+
+ def save_config(self, cfgfile=None):
+ if cfgfile is None: cfgfile = self.cfgfile
+ path = self.path / cfgfile
+ with open(path, "w") as f:
+ json.dump(self.config, f, indent=2)
+ return path
+
+ def save_model(self, ptfile=None):
+ if ptfile is None: ptfile = self.ptfile
+ path = self.path / ptfile
+ torch.save(self.model.state_dict(), path)
+ return path
+
+ def save(self):
+ self.save_model()
+ self.save_config()
+
+ def parameters(self, recurse=True):
+ return self.model.parameters(recurse)
+
+class MaggotEncoder(MaggotModule):
+ def __init__(self, path,
+ cfgfile=None,
+ #cfgfile="autoencoder_config.json",
+ ptfile="retrained_encoder.pt"):
+ super().__init__(path, cfgfile, ptfile)
+
+ @classmethod
+ def load_config(self, path):
+ config = super().load_config(path)
+ config["config"] = str(path)
+ return config
+
+ @classmethod
+ def load_model(cls, config, path):
+ encoder = Encoder(**config)
+ encoder.load_state_dict(torch.load(path))
+ return encoder
+
+class PretrainedMaggotEncoder(MaggotEncoder):
+ def __init__(self, path,
+ cfgfile=None,
+ #cfgfile="autoencoder_config.json",
+ ptfile="best_validated_encoder.pt"):
+ super().__init__(path, cfgfile, ptfile)
+
+ def save_model(self, ptfile="retrained_encoder.pt"):
+ return super().save_model(ptfile)
+
+class MaggotEncoders(nn.Module):
+ def __init__(self, path,
+ cfgfile="autoencoder_config.json", cls=MaggotEncoder, **kwargs):
+ super().__init__()
+ self._pattern = None
+ if isinstance(path, (str, Path)):
+ self._pattern = path
+ import glob
+ paths = glob.glob(str(path))
+ elif isinstance(path, list):
+ paths = path
+ self.encoders = [cls(path, cfgfile, **kwargs) for path in paths]
+
+ def forward(self, x):
+ return torch.cat([encoder(x) for encoder in self.encoders])
+
+ def save_config(self, cfgfile=None):
+ for encoder in self.encoders:
+ encoder.save_config(cfgfile)
+
+ def save_model(self, ptfile=None):
+ for encoder in self.encoders:
+ encoder.save_model(ptfile)
+
+ def save(self):
+ for encoder in self.encoders:
+ encoder.save()
+
+class DeepLinear(nn.Module):
+ def __init__(self, n_input, n_output, n_hidden=[], batch_norm=False,
+ weight_init="xavier"):
+ super().__init__()
+ self.batch_norm = batch_norm
+ self.weight_init = weight_init
+ layers = []
+ for n_hidden in list(n_hidden):
+ if n_hidden is None: n_hidden = n_input
+ layers.append(nn.Linear(n_input, n_hidden))
+ layers.append(nn.ReLU())
+ if batch_norm:
+ layers.append(nn.BatchNorm1d(n_hidden))
+ n_input = n_hidden
+ layers.append(nn.Linear(n_input, n_output))
+ self.layers = nn.Sequential(*layers)
+
+ def init_layers(self):
+ for layer in self.layers:
+ if isinstance(layer, nn.Linear):
+ if self.weight_init == "xavier":
+ nn.init.xavier_uniform_(layer.weight)
+ elif self.weight_init == "kaiming":
+ nn.init.kaiming_normal_(layer.weight)
+ else:
+ raise NotImplementedError(self.weight_init)
+ nn.init.zeros_(layer.bias)
+
+ def forward(self, x):
+ return self.layers(x)
+
+ def load(self, path):
+ self.load_state_dict(torch.load(path))
+
+ def save(self, path):
+ torch.save(self.state_dict(), path)
+
+class MaggotClassifier(MaggotModule):
+ def __init__(self, path, behavior_labels=[], n_latent_features=None,
+ n_layers=1, cfgfile=None, ptfile="trained_classifier.pt"):
+ super().__init__(path, cfgfile, ptfile)
+ try: # try load config file, if any
+ self.config
+ except:
+ assert bool(behavior_labels)
+ assert bool(n_latent_features)
+ self.config = dict(
+ clf_path=str(self.ptfilepath),
+ dim_latent=n_latent_features,
+ behavior_labels=behavior_labels,
+ clf_depth=0 if n_layers is None else n_layers - 1,
+ batch_norm=False,
+ weight_init="xavier",
+ loss="cross-entropy",
+ optimizer="adam")
+
+ @classmethod
+ def load_model(cls, config, path):
+ model = DeepLinear(
+ n_input=config["dim_latent"],
+ n_output=len(config["behavior_labels"]),
+ n_hidden=config["clf_depth"]*[None],
+ batch_norm=config["batch_norm"],
+ weight_init=config["weight_init"],
+ )
+ try:
+ model.load(path)
+ except:
+ # try:
+ # path = config["clf_path"]
+ # except KeyError:
+ model.init_layers()
+ # else:
+ # model.load(path)
+ return model
+
+ @property
+ def behavior_labels(self):
+ return self.config["behavior_labels"]
+
+ @behavior_labels.setter
+ def behavior_labels(self, labels):
+ self.config["behavior_labels"] = labels
+
+ @property
+ def n_latent_features(self):
+ return self.config["dim_latent"]
+
+ @property
+ def n_behaviors(self):
+ return len(self.behavior_labels)
+
+ @property
+ def n_layers(self):
+ return self.config["clf_depth"] + 1
+
+class SupervisedMaggot(nn.Module):
+ def __init__(self, cfgfilepath, behaviors=[], n_layers=1):
+ super().__init__()
+ if behaviors: # the model is only pre-trained
+ self.encoder = PretrainedMaggotEncoder(cfgfilepath)
+ self.clf = MaggotClassifier(self.encoder.path / "clf_config.json",
+ behaviors, self.encoder.config["dim_latent"], n_layers)
+ else: # the model has been retrained
+ self.clf = MaggotClassifier(cfgfilepath)
+ self.encoder = MaggotEncoder(self.clf.config["autoencoder_config"],
+ ptfile=self.clf.config["enc_path"])
+
+ def forward(self, x):
+ return self.clf(self.encoder(x))
+
+ def save(self):
+ self.encoder.save()
+ self.clf.config["autoencoder_config"] = str(self.encoder.cfgfilepath)
+ self.clf.config["enc_path"] = str(self.encoder.ptfilepath)
+ self.clf.save()
+
diff --git a/src/maggotuba/models/predict_model.py b/src/maggotuba/models/predict_model.py
index 1580c11b89e6a37fe8dc3195d5655247a2777ff3..7af062ad4d858726db3d5e292a43f5c82e7cff91 100644
--- a/src/maggotuba/models/predict_model.py
+++ b/src/maggotuba/models/predict_model.py
@@ -1,11 +1,7 @@
-from taggingbackends.data.trxmat import TrxMat
-from taggingbackends.data.chore import load_spine
-import taggingbackends.data.fimtrack as fimtrack
from taggingbackends.data.labels import Labels
from taggingbackends.features.skeleton import get_5point_spines
from maggotuba.models.denselayer import DenseLayer, new_generator
import numpy as np
-import json
def predict_model(backend, **kwargs):
"""
@@ -46,6 +42,10 @@ def predict_model(backend, **kwargs):
return labels if ret is None else ret
def predict_individual_data_files(backend, model, input_files, labels):
+ from taggingbackends.data.trxmat import TrxMat
+ from taggingbackends.data.chore import load_spine
+ import taggingbackends.data.fimtrack as fimtrack
+ #
_break = False # for now, a single file can be labelled at a time
for file in input_files:
# load the input data (or features)
@@ -114,6 +114,7 @@ def predict_individual_data_files(backend, model, input_files, labels):
def predict_larva_dataset(backend, model, file, labels, subset="validation"):
from taggingbackends.data.dataset import LarvaDataset
+ #
dataset = LarvaDataset(file, new_generator())
return model.predict(dataset, subset)
diff --git a/src/maggotuba/models/randomforest.py b/src/maggotuba/models/randomforest.py
deleted file mode 100644
index 39c45550648398a1cdfd5e456d2cd7eb3c587d5c..0000000000000000000000000000000000000000
--- a/src/maggotuba/models/randomforest.py
+++ /dev/null
@@ -1,130 +0,0 @@
-import os
-import json
-import random
-import pickle
-import numpy as np
-import torch
-from behavior_model.models.neural_nets import Encoder
-import behavior_model.data.utils as data_utils
-from behavior_model.data.enums import Label
-
-"""
-This MaggotUBA-based backend makes predictions feeding a trained random forest
-classifier with the latent representations of a trained MaggotUBA encoder.
-
-The encoder is trained as a component of an autoencoder, in a self-supervised
-fashion, and not further trained in combination with the random forest.
-"""
-class RandomForest:
- def __init__(self, config='config.json', clf='randomforest.pkl'):
- self._config = config
- self._clf = clf
- self.encoder = None
- self.average_body_length = None
- self.load()
-
- @property
- def config(self):
- if not isinstance(self._config, dict):
- with open(self._config, "r") as f:
- self._config = json.load(f)
- return self._config
-
- @config.setter
- def config(self, cfg):
- self._config = cfg
-
- @property
- def clf(self):
- if isinstance(self._clf, str):
- if not os.path.isabs(self._clf):
- self._clf = os.path.join(self.config["log_dir"], self._clf)
- with open(self._clf, "rb") as f:
- self._clf = pickle.load(f)
- return self._clf
-
- @clf.setter
- def clf(self, clf):
- self._clf = clf
-
- def window(self, data):
- winlen = self.config["len_traj"]
- N = data.shape[0]+1
- for m in range(0, N-winlen):
- n = m + winlen
- yield data[m:n]
-
- def pad(self, data):
- winlen = self.config["len_traj"]
- if data.shape[0] == 1:
- return data
- else:
- ind = np.r_[np.zeros(winlen // 2, dtype=int), np.arange(data.shape[0]), (data.shape[1]-1) *
- np.ones(winlen // 2 - 1, dtype=int)]
- return data[ind]
-
- def body_length(self, data):
- dx = np.diff(data[:,0::2], axis=1)
- dy = np.diff(data[:,1::2], axis=1)
- return np.sum(np.sqrt(dx*dx + dy*dy), axis=1)
-
- def preprocess(self, data):
- # normalize length
- if self.average_body_length:
- data = data / self.average_body_length
- # permute head and tail
- data = data[:,[8,9,6,7,4,5,2,3,0,1]]
- ws = []
- for coords in self.window(data):
- # rotate
- matrix = data_utils.compute_rotation_matrix(coords)
- coords = np.stack([coords[:,::2], coords[:,1::2]], axis=-1)
- coords = np.einsum('ji,tpi->tpj', matrix, coords)
- coords = coords.reshape(coords.shape[0],-1)
- w = coords
- # center coordinates
- wc = np.mean(w[:,4:6], axis=0, keepdims=True)
- w -= np.tile(wc, 5).reshape(1, -1)
- # select coordinates columns
- # (nothing to do)
- # reshape
- w = data_utils.reshape(w)
- ws.append(w)
- if ws:
- return self.pad(np.stack(ws))
-
- @torch.no_grad()
- def encode(self, spines):
- data = self.preprocess(spines)
- if data is None:
- return
- input_ = torch.from_numpy(data)
- # convert to float to run through network
- input_ = input_.float().cpu()
- # compute the codes
- output_ = self.encoder(input_)
- return output_.numpy()
-
- def predict(self, all_spines):
- labels = []
- latent_repr = self.encode(all_spines)
- if latent_repr is None:
- return
- label_ids = self.clf.predict(latent_repr)
- labelset = {float(symbol.value): symbol.name.lower() for symbol in Label}
- labels = [labelset[label] for label in label_ids]
- return labels
-
- def load(self, file=None):
- if file is not None:
- self.config = file
- config = self.config
- #torch.manual_seed(config["seed"])
- model_params = torch.load(os.path.join(config["log_dir"],
- "best_validated_encoder.pt"))
- self.encoder = encoder = Encoder(**config)
- encoder.load_state_dict(model_params)
- encoder.eval()
- encoder.to('cpu')
- return self
-
diff --git a/src/maggotuba/models/train_model.py b/src/maggotuba/models/train_model.py
index a19f121173f0e23c6058daeb811fd0e01b4ee427..631ba7c54ba005ec0d3b38114cdd48891e6400b4 100644
--- a/src/maggotuba/models/train_model.py
+++ b/src/maggotuba/models/train_model.py
@@ -1,10 +1,7 @@
from taggingbackends.data.labels import Labels
from taggingbackends.data.dataset import LarvaDataset
from maggotuba.models.denselayer import DenseLayer, new_generator
-import numpy as np
import json
-import torch
-import os
import glob
def train_model(backend, layers=1, pretrained_model_instance="default"):
@@ -40,8 +37,9 @@ def train_model(backend, layers=1, pretrained_model_instance="default"):
with open(str(dst), "wb") as o:
o.write(i.read())
# load the pretrained model
- model = DenseLayer(autoencoder_config=config_file, n_behaviors=nlabels,
- n_layers=layers)
+ labels = dataset.labels
+ labels = labels if isinstance(labels[0], str) else [s.decode() for s in labels]
+ model = DenseLayer(config_file, labels, layers)
# fine-tune and save the model
model.train(dataset)
model.save()