Skip to content
Snippets Groups Projects
Commit 965e1f62 authored by François  LAURENT's avatar François LAURENT
Browse files

Initial commit

parents
No related branches found
No related tags found
No related merge requests found
# Julia install and coverage
*.jl.*.cov
*.jl.cov
*.jl.mem
/Manifest.toml
# Python install
__pycache__/
*.py[cod]
build/
dist/
eggs/
.eggs/
*.egg-info/
.installed.cfg
*.egg
poetry.lock
# DotEnv configuration
.env
env/
# exclude data from source control by default
/data/
# Visual Studio Code
.vscode/
# macOS
.DS_Store
# vim
*.sw*
LICENSE 0 → 100644
MIT License
Copyright (c) 2022 François Laurent, Institut Pasteur
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
# MaggotUBA backend adapter
Wrapper project to allow the Nyx tagger UI to call [`MaggotUBA`](https://gitlab.pasteur.fr/les-larves/structured-temporal-convolution).
[tool.poetry]
name = "MaggotUBA-adapter"
version = "0.1.0"
description = "Interface between MaggotUBA and the Nyx tagging UI"
authors = ["François Laurent <francois.laurent@posteo.net>"]
license = "MIT"
packages = [
{ include = "maggotuba", from = "src" },
]
[tool.poetry.dependencies]
python = "^3.8,<3.11"
taggingbackends = {git = "https://gitlab.pasteur.fr/nyx/TaggingBackends", rev = "main"}
structured-temporal-convolution = {git = "git@gitlab.pasteur.fr:les-larves/structured-temporal-convolution.git", branch="poetry"}
torch = "^1.11.0"
numpy = "^1.19.3"
[tool.poetry.dev-dependencies]
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
__version__ = '0.1.0'
from taggingbackends.data.trxmat import TrxMat
from taggingbackends.data.chore import load_spine
from taggingbackends.data.labels import Labels
from taggingbackends.features.skeleton import get_5point_spines
from randomforest import RandomForest
import numpy as np
import json
def predict_model(backend):
"""
This function generates predicted labels for all the input data.
The input files can be read from any directory.
All generated/modified files should be written to `data/interim` or
`data/processed`.
The predicted labels are expected in `data/processed`.
The `predict_model.py` script is required.
"""
# in the present case, as make_dataset.py and build_features.py do nothing,
# we pick files in `data/raw`
input_files = backend.list_input_files()
# we could go and pick files in `data/interim` as well:
input_files += backend.list_interim_files()
assert 0 < len(input_files) <= 2
metadata = None
metadata_file = [file for file in input_files if file.name == "metadata"]
if metadata_file:
metadata_file = metadata_file[0]
input_files.remove(metadata_file)
with open(metadata_file, "r") as f:
metadata = json.load(f)
for file in input_files:
# load the input data (or features)
if file.name.endswith(".spine"):
spine = load_spine(file)
run = spine["date_time"].iloc[0]
larvae = spine["larva_id"].values
t = spine["time"].values
data = spine.iloc[:,3:].values
elif file.name == "trx.mat":
trx = TrxMat(file)
t = trx["t"]
data = trx["spine"]
run, data = next(iter(data.items()))
if run == "spine":
run, data = next(iter(data.items()))
t = t[run]
else:
# TODO: support more file formats
continue
# downsample the skeleton
if isinstance(data, dict):
for larva in data:
data[larva] = np.vstack([get_5point_spines(spine) for spine in data[larva]])
else:
data = get_5point_spines(data)
# load the model
model_files = backend.list_model_files()
config_file = [file for file in model_files if file.name.endswith("config.json")]
model = RandomForest(config_file[-1]).load()
# assign labels
labels = Labels()
if isinstance(data, dict):
ref_length = np.mean(np.concatenate([
model.body_length(spines) for spines in data.values()
]))
print(f"average body length: {ref_length}")
for larva, spines in data.items():
predictions = model.predict(spines, average_body_length=ref_length)
labels[run, larva] = dict(zip(t[larva], predictions))
else:
ref_length = model.body_length(data).mean()
print(f"average body length: {ref_length}")
for larva in np.unique(larvae):
mask = larvae == larva
predictions = model.predict(data[mask], average_body_length=ref_length)
labels[run, larva] = dict(zip(t[mask], predictions))
# save the predicted labels to file
if metadata:
labels[run]['metadata'] = metadata
else:
labels[run]['metadata'] = {'filename': file.name}
labels.metadata['labels'] = ["run", "bend", "stop", "hunch", "back", "roll"]
labels.metadata['label_colors'] = ["#000000", "#ff0000", "#00ff00",
"#0000ff", "#00ffff", "#ffff00"]
labels.dump(backend.processed_data_dir() / "predicted.labels")
from taggingbackends.main import main
if __name__ == "__main__":
main(predict_model)
import os
import json
import random
import pickle
import numpy as np
import torch
from behavior_model.models.neural_nets import Encoder
import behavior_model.data.utils as data_utils
from behavior_model.data.enums import Label
class RandomForest:
def __init__(self, config='config.json', clf='randomforest.pkl'):
self._config = config
self._clf = clf
self.encoder = None
@property
def config(self):
if not isinstance(self._config, dict):
with open(self._config, "r") as f:
self._config = json.load(f)
return self._config
@config.setter
def config(self, cfg):
self._config = cfg
@property
def clf(self):
if isinstance(self._clf, str):
if not os.path.isabs(self._clf):
self._clf = os.path.join(self.config["log_dir"], self._clf)
with open(self._clf, "rb") as f:
self._clf = pickle.load(f)
return self._clf
@clf.setter
def clf(self, clf):
self._clf = clf
def window(self, data):
winlen = self.config["len_traj"]
N = data.shape[0]+1
for m in range(0, N-winlen):
n = m + winlen
yield data[m:n]
def pad(self, data):
winlen = self.config["len_traj"]
ind = np.r_[np.zeros(winlen // 2, dtype=int), np.arange(data.shape[0]), (data.shape[1]-1) *
np.ones(winlen // 2 - 1, dtype=int)]
return data[ind]
def body_length(self, data):
dx = np.diff(data[:,0::2], axis=1)
dy = np.diff(data[:,1::2], axis=1)
return np.sum(np.sqrt(dx*dx + dy*dy), axis=1)
def preprocess(self, data, average_body_length=None):
# normalize length
if average_body_length:
data = data / average_body_length
# permute head and tail
data = data[:,[8,9,6,7,4,5,2,3,0,1]]
ws = []
for coords in self.window(data):
# rotate
matrix = data_utils.compute_rotation_matrix(coords)
coords = np.stack([coords[:,::2], coords[:,1::2]], axis=-1)
coords = np.einsum('ji,tpi->tpj', matrix, coords)
coords = coords.reshape(coords.shape[0],-1)
w = coords
# center coordinates
wc = np.mean(w[:,4:6], axis=0, keepdims=True)
w -= np.tile(wc, 5).reshape(1, -1)
# select coordinates columns
# (nothing to do)
# reshape
w = data_utils.reshape(w)
ws.append(w)
return self.pad(np.stack(ws))
@torch.no_grad()
def encode(self, spines, average_body_length=None):
data = self.preprocess(spines, average_body_length)
input_ = torch.from_numpy(data)
# convert to float to run through network
input_ = input_.float().cpu()
# compute the codes
output_ = self.encoder(input_)
return output_.numpy()
def predict(self, all_spines, average_body_length=None):
labels = []
latent_repr = self.encode(all_spines, average_body_length)
label_ids = self.clf.predict(latent_repr)
labelset = {float(symbol.value): symbol.name.lower() for symbol in Label}
labels = [labelset[label] for label in label_ids]
return labels
def load(self, file=None):
if file is not None:
self.config = file
config = self.config
#torch.manual_seed(config["seed"])
model_params = torch.load(os.path.join(config["log_dir"],
"best_validated_encoder.pt"))
self.encoder = encoder = Encoder(**config)
encoder.load_state_dict(model_params)
encoder.eval()
encoder.to('cpu')
return self
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment