Commit 4f72c783 authored by amichaut's avatar amichaut
Browse files

moved make_synthetic_data methods to module

parent fbd92cfc
Pipeline #81676 passed with stages
in 24 seconds
This diff is collapsed.
......@@ -1592,3 +1592,168 @@ def batch_analysis(dirdata, run_='cell_analysis', refresh=False, invert_yaxis=Tr
plot_pooled_MSD(data_dir, dt=timescale, plot_method='along_Y')
def make_diff_traj(part_index=0, grid_size=[500, 500, 500], dim=3, tmax=10, periodic=True, noise_amp=10,
x0=[250, 250, 250], bias=[0, 0, 0]):
"""
Generate a trajectory with a diffusive trajectory, with a bias.
bias gives the amplitude at each step along each dimension.
"""
# time and index
t = np.arange(tmax)
index = np.ones(tmax) * part_index
# displacement
displacement = pd.DataFrame(np.random.randn(tmax, dim), columns=list('xyz')[0:dim])
displacement['r2'] = 0
for i in range(dim):
displacement['r2'] += displacement[list('xyz')[i]] ** 2
displacement['r'] = np.sqrt(displacement['r2'])
for i in range(dim):
displacement[list('xyz')[i]] /= displacement['r'] # normalize raw displacement
displacement[list('xyz')[i]] *= noise_amp # apply amplitude
displacement[list('xyz')[i]] += bias[i] # add bias
displacement = displacement[list('xyz')[0:dim]].values
# traj
traj = np.zeros((tmax, dim))
for i in range(dim):
traj[:, i] = np.cumsum(displacement[:, i]) + x0[i]
if periodic:
traj[:, i] = np.remainder(traj[:, i], grid_size[i])
return pd.DataFrame(np.concatenate([index[:, None], t[:, None], traj], axis=1),
columns=['traj', 'frame'] + list('xyz')[0:dim])
def make_spatial_gradient(part_num=100, grid_size=[500, 500, 500], dim=3, tmax=10, periodic=True,
bias_basis=[0, 0, 0],
diff_grad={'min': 0, 'max': 10}, bias_grad={'min': 0, 'max': 10, 'dim': 0},
grad={'step_num': 4, 'dim': 0, 'boundaries':None},
x0_range={'x': [0.1, 0.9], 'y': [0.1, 0.9], 'z': [0.1, 0.9]}, dt=1):
"""
Make a spatial gradient in diffusion or bias, along a specific dimension, given by grad['dim'].
The number of steps on the gradient is given by grad['step_num'].
The gradient can be in diffusion with diff_grad or bias_grad. min and max give the extrema of the gradient, and bias_grad['dim'] give the dimension along the gradient in bias is applied.
An overall constant bias can be passed by bias_basis.
"""
df = pd.DataFrame([], columns=['traj', 'frame'] + list('xyz')[0:dim])
df_param = pd.DataFrame([], columns=['traj', 'v', 'D'])
diff_grad_ = np.linspace(diff_grad['min'], diff_grad['max'], grad['step_num'])
bias_grad_ = np.linspace(bias_grad['min'], bias_grad['max'], grad['step_num'])
# spatial boundaries of the regions of particles
lims = [[x0_range['x'][0] * grid_size[0], x0_range['x'][1] * grid_size[0]],
[x0_range['y'][0] * grid_size[1], x0_range['y'][1] * grid_size[1]],
[x0_range['z'][0] * grid_size[2], x0_range['z'][1] * grid_size[2]]]
part_count = 0
for i in range(grad['step_num']):
grad_increment = (lims[grad['dim']][1] - lims[grad['dim']][0]) / grad['step_num']
lims_ = lims[:]
lims_[grad['dim']] = [lims_[grad['dim']][0] + i * grad_increment,
lims_[grad['dim']][0] + (i + 1) * grad_increment]
noise_amp = diff_grad_[i]
bias = bias_basis[:]
bias[bias_grad['dim']] = bias_grad_[i]
bias_ampl = 0
for k in range(dim):
bias_ampl += bias[k] ** 2
bias_ampl = np.sqrt(bias_ampl)
for j in range(int(part_num / grad['step_num'])):
x0 = [np.random.uniform(lims_[0][0], lims_[0][1]),
np.random.uniform(lims_[1][0], lims_[1][1]),
np.random.uniform(lims_[2][0], lims_[2][1])]
traj = make_diff_traj(part_index=part_count, noise_amp=noise_amp, x0=x0, bias=bias, tmax=tmax,
periodic=periodic, dim=dim)
df = pd.concat([df, traj])
v = bias_ampl / dt
D = noise_amp ** 2 / (2. * dim * dt)
df_param.loc[part_count, :] = [part_count, v, D]
part_count += 1
return df, df_param
def make_attraction_node(part_num=100, grid_size=[500, 500, 500], dim=3, tmax=10, periodic=True, noise_amp=10,
bias_basis=[0, 0, 0],
attraction_ampl=10, node=None, x0_range={'x': [0.1, 0.9], 'y': [0.1, 0.9], 'z': [0.1, 0.9]},
dt=1):
"""Make array of diffusive particles biased toward a node (or away if attraction_ampl is negative)"""
df = pd.DataFrame([], columns=['traj', 'frame'] + list('xyz')[0:dim])
df_param = pd.DataFrame([], columns=['traj', 'v', 'D'])
if node is None:
node = [grid_size[d] / 2 for d in range(dim)] # by default center
# spatial boundaries of the regions of particles
lims = [[x0_range['x'][0] * grid_size[0], x0_range['x'][1] * grid_size[0]],
[x0_range['y'][0] * grid_size[1], x0_range['y'][1] * grid_size[1]],
[x0_range['z'][0] * grid_size[2], x0_range['z'][1] * grid_size[2]]]
for i in range(part_num):
x0 = [np.random.uniform(lims[0][0], lims[0][1]),
np.random.uniform(lims[1][0], lims[1][1]),
np.random.uniform(lims[2][0], lims[2][1])]
# unit vector towards node
node_vec = np.array([node[d] - x0[d] for d in range(dim)])
sum_ = 0
for d in range(dim):
sum_ += node_vec[d] ** 2
node_vec /= np.sqrt(sum_)
bias = node_vec * attraction_ampl
bias = bias + np.array(bias_basis)
bias_ampl = 0
for k in range(dim):
bias_ampl += bias[k] ** 2
bias_ampl = np.sqrt(bias_ampl)
traj = make_diff_traj(part_index=i, noise_amp=noise_amp, x0=x0, bias=bias, tmax=tmax, periodic=periodic,
dim=dim)
df = pd.concat([df, traj])
v = bias_ampl / dt
D = noise_amp ** 2 / (2. * dim * dt)
df_param.loc[i, :] = [i, v, D]
return df, df_param
def plot_synthetic_stack(df, outdir, dpi=300, grid_size=[500, 500, 500], tmax=10):
"""Plot synthetic data and save it as a grayscaled tiff stack"""
stack = [] # stack to store images
groups = df.groupby('frame')
# plot frames
for i in range(tmax):
group = groups.get_group(i).reset_index(drop=True)
figsize = (grid_size[0] / dpi, grid_size[1] / dpi)
fig = plt.figure(frameon=False, figsize=figsize, dpi=dpi)
ax = fig.add_axes([0, 0, 1, 1])
for k in range(group.shape[0]):
ax.scatter(group.loc[k, 'x'], group.loc[k, 'y'], s=10)
ax.set_xlim(0, grid_size[0])
ax.set_ylim(0, grid_size[1])
ax.invert_yaxis()
ax.axis('off')
fig.canvas.draw()
fig_image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
fig_image = fig_image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
fig_image = rgb2gray(fig_image) # convert to grayscale
fig_image = img_as_ubyte(fig_image) # convert to 8 bit
stack.append(fig_image)
plt.close(fig)
# save
stack = np.array(stack)
tifff.imsave(osp.join(outdir,'stack.tiff'), stack)
\ No newline at end of file
......@@ -28,279 +28,10 @@ import os.path as osp
import sys
import argparse
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np
import pandas as pd
from skimage.color import rgb2gray
from skimage.util import img_as_ubyte
import seaborn as sns
import tifffile as tifff
from track_analyzer import prepare as tpr
color_list = [c['color'] for c in list(plt.rcParams['axes.prop_cycle'])] + sns.color_palette("Set1", n_colors=9,
desat=.5)
mpl.use('agg')
# parameters
traj_num = 500 # number of tracks
frame_num = 5 # number of frame
dim = 2 # dimension
lims = [[0,500],[0,500]] # space limits
periodic = False # space periodicity
random = False # initial position spacing
space = {'dim':dim,'lims':lims,'periodic':periodic,'random':random}
pos0_lims = [[100,400],[100,400]] # initial position limits
diff_field = {'cst':1} # constant diffusion
vel_field = [{'a':0.01,'p':2,'b':250,'c':-0.01,'q':2,'d':250}, # vx = x^2 - y^2 (centered)
{'a':0.01,'p':2,'b':250,'c':-0.01,'q':2,'d':250}, # vy = x^2 - y^2 (centered)
]
config_default = {'traj_num':traj_num,
'frame_num':frame_num,
'space':space,
'pos0_lims':pos0_lims,
'diff_field':diff_field,
'vel_field':vel_field,
}
def get_config(data_dir):
"""
Load from config file 'config.csv' if it exists, else return None
"""
config_fn = osp.join(data_dir,'config.csv')
if osp.exists(config_fn):
config = tpr.load_dict(config_fn)
else:
config = None
return config
def make_pos0(pos0_lims=pos0_lims,traj_num=traj_num,random=True):
"""
Generate list of initial positions at random position within boudaries given by pos0_lims
"""
pos0_list = []
if random:
for i in range(traj_num):
pos0 = np.array([np.random.uniform(pos0_lims_[0], pos0_lims_[1]) for pos0_lims_ in pos0_lims])
pos0_list.append(pos0)
else:
# create a rectangle array of N_ positions evenly spaced
# height/width = N_h/N_w and N_h*N_w = N_
width = np.abs(pos0_lims[0][1] - pos0_lims[0][0])
height = np.abs(pos0_lims[1][1] - pos0_lims[1][0])
N_ = int(np.sqrt(traj_num))**2 # number of positions needs to be a square number
N_h = int(np.sqrt(N_) * height/width)
N_w = int(N_ / N_h)
# meshgrid
X,Y = np.meshgrid(np.linspace(pos0_lims[0][0],pos0_lims[0][1],N_w),np.linspace(pos0_lims[1][0],pos0_lims[1][1],N_h))
for i in range(X.shape[0]):
for j in range(X.shape[1]):
pos0_list.append(np.array([X[i,j],Y[i,j]]))
return pos0_list
def polynom_mapping(pos,params={'cst':0},space=space):
"""
Evaluate polynomial at position pos.
Polynomial function: (a*(x-b))^p + (c*(y-d))^q + (e*(z-f))^r + cst
Useful to generate a velocity field with polynomial expression
"""
# put parameter to either if missing
for p in list('abcdefpqr')+['cst']:
if p not in params.keys():
params[p] = 0
# check dimension of position compatible with space dimension
pos_dim = len(pos)
if pos_dim > space['dim']:
raise Exception('position dimension larger than space dimension')
value = params['cst']
if pos_dim == 1:
value += (params['a'] * (pos[0] - params['b'])) ** params['p']
elif pos_dim == 2:
value += (params['a'] * (pos[0] - params['b'])) ** params['p']
value += (params['c'] * (pos[1] - params['d'])) ** params['q']
elif pos_dim == 3:
value += (params['a'] * (pos[0] - params['b'])) ** params['p']
value += (params['c'] * (pos[1] - params['d'])) ** params['q']
value += (params['e'] * (pos[2] - params['f'])) ** params['r']
return value
def n_polynom_mapping(pos,params_list=[{'cst':0}],space=space):
"""
n-dimension polynomial evaluation at position pos using polynom_mapping()
"""
# check there is a set of parameters along each space dimension
if len(params_list) != space['dim']:
raise Exception('params_list must have the same size as space dimension')
vector = []
for i in range(space['dim']):
vector.append(polynom_mapping(pos,params=params_list[i],space=space))
return vector
def export_vel_fields(vel_field,pos_list,data_dir,space=space):
"""
Export velocity curl and div fields evaluated at an list of positions.
vel_field (generated by n_polynom_mapping) is a list of n elements (n=2 or 3), each element being a spatial component
"""
# check 2D velocity field is 2D
if len(vel_field) < 2:
raise Exception("Need a 2D velocity field")
# check missing parameters in vel_field
for i in range(len(vel_field)):
for p in list('abcdefpqr'):
if p not in vel_field[i].keys():
vel_field[i][p] = 0
# get velocity list
vel_list = []
for pos in pos_list:
vel_list.append(n_polynom_mapping(pos,params_list=vel_field,space=space))
# calculate 2D curl and div
curl_list = []
div_list = []
for pos in pos_list:
# curl
curl = dx_polynom(vel_field[1],pos) - dy_polynom(vel_field[0],pos) # Dx_vy - Dy_vx
curl_list.append(curl)
# div
div = dx_polynom(vel_field[0],pos) + dy_polynom(vel_field[1],pos) # Dx_vx + Dy_vy
div_list.append(div)
# save to df
data = np.concatenate((np.array(pos_list),np.array(vel_list),np.array([curl_list]).T,np.array([div_list]).T),axis=1)
columns = ['x','y','vx','vy','vz','curl','div'] if len(vel_field) == 3 else ['x','y','vx','vy','curl','div']
df_out = pd.DataFrame(data, columns=columns)
df_out.to_csv(osp.join(data_dir,'fields.csv'))
def dx_polynom(polynom,pos):
"""
x-component of spatial derivative of polynom_mapping at position pos
"""
if polynom['p'] > 0:
value = polynom['a']*polynom['p']*(pos[0]-polynom['b'])**(polynom['p']-1)
else:
value = 0
return value
def dy_polynom(polynom,pos):
"""
y-component of spatial derivative of polynom_mapping at position pos
"""
if polynom['q'] > 0:
value = polynom['c']*polynom['q']*(pos[1]-polynom['d'])**(polynom['q']-1)
else:
value = 0
return value
def make_traj(pos0,frame_num=frame_num,diff_field=diff_field,vel_field=vel_field,space=space,track_id=0):
"""
Generate a trajectory of frame_num positions from a an initial position pos0.
At each frame, the displacement is given by the superposition of a diffusive component and ballistic component.
Each component are computed at the position using the diffusion filed diff_field and the velocity field vel_field.
"""
traj = pd.DataFrame(columns=['track','frame']+list('xyz')[:space['dim']])
pos = pos0 # intialize position
traj.loc[0,:] = [track_id,0] + list(pos0)
for frame in range(1,frame_num): # first position already given
# diffusive component
diff = np.random.randn(space['dim'])
diff_2 = diff ** 2
diff_normalized = diff / np.sqrt(diff_2.sum())
diff_ampl = polynom_mapping(pos,diff_field,space) # diffusion amplitude at pos
# ballistic component
vel = n_polynom_mapping(pos,params_list=vel_field,space=space) # velocity at pos
# new position
pos += diff_ampl * diff_normalized + vel
if periodic:
for i in range(len(pos)):
pos[i] = np.remainder(pos[i], space['lims'][i])
traj.loc[frame,:] = [track_id,frame] + list(pos)
return traj
def make_dataset(pos0_list,frame_num=frame_num,diff_field=diff_field,vel_field=vel_field,space=space):
"""
Make set of trajectories using make_traj
"""
df = pd.DataFrame(columns=['track','frame']+list('xyz')[:space['dim']])
for i,pos0 in enumerate(pos0_list):
traj = make_traj(pos0,frame_num=frame_num,diff_field=diff_field,vel_field=vel_field,space=space,track_id=i)
df = pd.concat([df,traj],ignore_index=True)
return df
def plot_synthetic_stack(df, outdir, dpi=300, image_size=[500, 500], frame_num=10):
"""Plot synthetic data and save it as a grayscaled tiff stack"""
stack = [] # stack to store images
groups = df.groupby('frame')
# plot frames
for i in range(frame_num):
group = groups.get_group(i).reset_index(drop=True)
figsize = (image_size[0] / dpi, image_size[1] / dpi)
fig = plt.figure(frameon=False, figsize=figsize, dpi=dpi)
ax = fig.add_axes([0, 0, 1, 1])
for k in range(group.shape[0]):
ax.scatter(group.loc[k, 'x'], group.loc[k, 'y'], s=3)
ax.set_xlim(0, image_size[0])
ax.set_ylim(0, image_size[1])
ax.invert_yaxis()
ax.axis('off')
fig.canvas.draw()
fig_image = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
fig_image = fig_image.reshape(fig.canvas.get_width_height()[::-1] + (3,))
fig_image = rgb2gray(fig_image) # convert to grayscale
fig_image = img_as_ubyte(fig_image) # convert to 8 bit
stack.append(fig_image)
plt.close(fig)
# save
stack = np.array(stack)
tifff.imwrite(osp.join(outdir,'stack.tiff'), stack)
from track_analyzer import synthetic_data as tsy
def parse_args(args=None):
......@@ -324,7 +55,6 @@ def parse_args(args=None):
def main(args=None):
args = sys.argv[1:] if args is None else args
parsed_args = parse_args(args)
......@@ -333,7 +63,8 @@ def main(args=None):
plot = parsed_args.plot
# get config
config_ = get_config(data_dir)
config_default = tsy.init_config()
config_ = tsy.get_config(data_dir)
config = config_default if config_ is None else config_
traj_num = config["traj_num"] # number of tracks
......@@ -342,24 +73,46 @@ def main(args=None):
pos0_lims = config["pos0_lims"] # initial position limits
diff_field = config["diff_field"] # constant diffusion
vel_field = config["vel_field"]
timescale = config["timescale"]
lengthscale = config["lengthscale"]
pos0_list = make_pos0(pos0_lims=pos0_lims,traj_num=traj_num,random=space['random'])
df = make_dataset(pos0_list,frame_num=frame_num,diff_field=diff_field,vel_field=vel_field,space=space)
df.to_csv(osp.join(data_dir,'positions.csv'), index=False)
pos0_list = tsy.make_pos0(pos0_lims=pos0_lims, traj_num=traj_num, random=space['random'])
df = tsy.make_dataset(pos0_list, frame_num=frame_num, diff_field=diff_field, vel_field=vel_field, space=space,
timescale=timescale, lengthscale=lengthscale)
df.to_csv(osp.join(data_dir, 'positions.csv'), index=False)
# save parameters
parameters = {'space':space,
'pos0_lims':pos0_lims,
'traj_num':traj_num,
'diff_field':diff_field,
'vel_field':vel_field,
'frame_num':frame_num,
parameters = {'space': space,
'pos0_lims': pos0_lims,
'traj_num': traj_num,
'diff_field': diff_field,
'vel_field': vel_field,
'frame_num': frame_num,
}
tpr.write_dict(parameters, osp.join(data_dir, 'parameters.csv'))
# save info
width = np.abs(space['lims'][0][1] - space['lims'][0][0])
height = np.abs(space['lims'][1][1] - space['lims'][1][0])
info = {'length_unit': 'um',
'time_unit': 's',
'lengthscale': lengthscale,
'timescale': timescale,
'image_width': width,
'image_height': height,
'table_unit': 'unit',
'z_step': 0,
}
tpr.write_dict(parameters,osp.join(data_dir,'parameters.csv'))
info_fn = osp.join(data_dir, 'info.txt')
with open(info_fn, 'w+') as f:
for k in info.keys():
f.write('{}:{}\n'.format(k, info[k]))
if plot:
plot_synthetic_stack(df, data_dir, dpi=300, image_size=[lims[0][1],lims[1][1]], frame_num=frame_num)
tsy.plot_synthetic_stack(df, data_dir, dpi=300, image_size=[space['lims'][0][1], space['lims'][1][1]],
frame_num=frame_num)
if __name__ == '__main__':
......
......@@ -34,163 +34,300 @@ from skimage.util import img_as_ubyte
import seaborn as sns
import tifffile as tifff
from track_analyzer import prepare as tpr
# Plotting parameters
color_list = [c['color'] for c in list(plt.rcParams['axes.prop_cycle'])] + sns.color_palette("Set1", n_colors=9,
desat=.5)
plot_param = {'figsize': (5, 5), 'dpi': 300, 'color_list': color_list, 'format': '.png', 'despine': True, 'logx': False,
'logy': False, 'invert_yaxis': True, 'export_data_pts': False}
mpl.use('agg')
def make_diff_traj(part_index=0, grid_size=[500, 500, 500], dim=3, tmax=10, periodic=True, noise_amp=10,
x0=[250, 250, 250], bias=[0, 0, 0]):
"""
Generate a trajectory with a diffusive trajectory, with a bias.
bias gives the amplitude at each step along each dimension.
"""
# time and index
t = np.arange(tmax)
index = np.ones(tmax) * part_index
# displacement
displacement = pd.DataFrame(np.random.randn(tmax, dim), columns=list('xyz')[0:dim])
displacement['r2'] = 0
for i in range(dim):
displacement['r2'] += displacement[list('xyz')[i]] ** 2
displacement['r'] = np.sqrt(displacement['r2'])
for i in range(dim):
displacement[list('xyz')[i]] /= displacement['r'] # normalize raw displacement
displacement[list('xyz')[i]] *= noise_amp # apply amplitude
displacement[list('xyz')[i]] += bias[i] # add bias
displacement = displacement[list('xyz')[0:dim]].values
# traj
traj = np.zeros((tmax, dim))
for i in range(dim):
traj[:, i] = np.cumsum(displacement[:, i]) + x0[i]
# parameters
traj_num = 500 # number of tracks
frame_num = 5 # number of frame
dim = 2 # dimension
lims = [[0, 500], [0, 500]] # space limits
periodic = False # space periodicity