Commit 4d18b874 authored by Jorge SOUSA's avatar Jorge SOUSA
Browse files

eVIVALDI initial commit

parents
######################
#####DEPENDENCIES#####
######################
Default packages in Python v2.7.3.
Additional packages required (recommended install with pip)
- matplotlib
- pillow (or PIL)
- seaborn
######################
######HOW TO RUN######
######################
To run a simulation, two input files are needed: the parameters (-paramfile) and the setup (-setupfile) file. These files parameterise the simulation run and define a bunch of logistic stuff. One of the most important one is the first line in the parameters file, that defines the output path of the simulations (relative to where the command is being run). Please be careful to end the path with the “/“ character.
The other mandatory argument is the ID (-id), that defines a name for the current simulation.
The flag —-visualize can be used for an interactive visualisation of the simulation, but it is not mandatory (it slows down the simulations).
The “Empties” folder needs to be located also in the folder where the run command is executed.
Run command:
python Handler.py -id <name> -paramfile <paramfile.txt> -setupfile <setupfile.txt> --visualize
Examples (with the included files)
(1) python src_eVIVALDI_9.2/Handler.py -id TestSimulation1 -paramfile TestScripts/FileS2_Fig2e.Parameters.txt -setupfile TestScripts/FileS1_Fig2e.MicrobialSetup.txt --visualize
(2) python src_eVIVALDI_9.2/Handler.py -id TestSimulation2 -paramfile TestScripts/FileS4_Fig4a.Parameters.txt -setupfile TestScripts/FileS3_Fig4a.MicrobialSetup.txt --visualize
(3) python src_eVIVALDI_9.2/Handler.py -id TestSimulation2 -paramfile TestScripts/FileS6_Fig5b.Parameters.txt -setupfile TestScripts/FileS5_Fig5b.MicrobialSetup.txt --visualize
\ No newline at end of file
import random, math
from Parameters import model_parameters as param
import copy
class Place():
def __init__(self, position, world):
self.my_world=world
self.coordinates=position
self.occupants=[]
self.neighbours={}
#(block, original_genome)
#block is ARG or "trash"
self.eDNA=[] #Holds DNA of dead cells (more than one block of DNA). Is degraded at a rate.
self.free_phages=[]
self.rif_conc=0; self.rif_time=0; self.rif_inoculum=0
self.str_conc=0; self.str_time=0; self.str_inoculum=0
self.quin_conc=0; self.quin_time=0; self.quin_inoculum=0
def Add_Available(self):self.my_world.Set_Available(self.coordinates)
def Add_Unavailable(self): self.my_world.Set_Unavailable(self.coordinates)
def Get_Distance_To(self, other_location):
return max(abs(self.coordinates[0]-other_location[0]), abs(self.coordinates[1]-other_location[1]))
def Get_Reachable(self, distance, desired_type="Location"):
import itertools
new_ds=[]
for dimension in self.coordinates:
new_d=[dimension+d for d in range(-distance, distance+1)]
new_new_d=[]
for d in new_d:
if param["World Type"]=="Toroidal":
if d<0: new_new_d.append(self.my_world.max_height-int(math.fabs(d))); continue
if d>=self.my_world.max_height: new_new_d.append(d-self.my_world.max_height); continue
new_new_d.append(d)
new_ds.append(new_new_d)
full_locations=list(itertools.product(*new_ds))
possible_locations=[]
for pos in full_locations:
if (pos[0]==self.coordinates[0]) and (pos[1]==self.coordinates[1]): continue #skip if it is my own coordinates
if (pos[0]<0) or (pos[0]>=self.my_world.max_height) or (pos[1]<0) or (pos[1]>=self.my_world.max_width): continue #skip if it is out of boundaries
possible_locations.append(pos)
if desired_type=="Location": return possible_locations
if desired_type=="LocationObj":
return [self.my_world.world[loc] for loc in possible_locations]
if desired_type=="Bacteria":
reachable_bacteria=[self.my_world.world[loc].occupants[0] for loc in possible_locations if len(self.my_world.world[loc].occupants)>0]
return reachable_bacteria
def AddFreePhage(self, phage_type, family, donor_genome, cargo, crispr_sequence, receptor, maxcargo):
if len(self.free_phages)>=param["Phage Carrying Capacity"]: print "NO SPACE FOR MORE PHAGE"; exit(); return
self.free_phages.append((donor_genome, {"Family":family, "Position":None, "Cargo": cargo, "Time":0,
"Type":phage_type,
"Receptor":receptor,
"crispr_seq": crispr_sequence,
"MaxCargoSize": maxcargo}))
def GetNumberOfNearbyPhage(self):
n_phage=0
for l in self.Get_Reachable(1, desired_type="Location"): n_phage+=len(self.my_world.world[l].free_phages)
n_phage+=len(self.free_phages)#Add also phage in the current location
return n_phage
def AddAntibiotic(self, ant_type, quantity):
if ant_type=="RIF":
self.rif_time=0
self.rif_conc+=quantity
self.rif_inoculum=self.rif_conc
if ant_type=="STR":
self.str_time=0
self.str_conc+=quantity
self.str_inoculum=self.str_conc
if ant_type=="QUIN":
self.quin_time=0
self.quin_conc+=quantity
self.quin_inoculum=self.str_conc
def Diffuse(self, ant_type):
lambda_diffusion=0.17
if ant_type=="RIF": current_conc=self.rif_conc
if ant_type=="STR": current_conc=self.str_conc
if ant_type=="QUIN": current_conc=self.quin_conc
distance=1
deployed_in=[]
while current_conc>1:
if ant_type=="RIF": current_conc=int(self.rif_conc*math.exp(-lambda_diffusion*distance))
if ant_type=="STR": current_conc=int(self.str_conc*math.exp(-lambda_diffusion*distance))
if ant_type=="QUIN": current_conc=int(self.quin_conc*math.exp(-lambda_diffusion*distance))
locs=self.Get_Reachable(distance, desired_type="Location")
for l in locs:
if not(l in deployed_in):
self.my_world.world[l].AddAntibiotic(ant_type, int(random.choice(range(current_conc/2, current_conc))))
deployed_in.append(l)
distance+=1
def Erode_Elements(self, current_iteration):
#Degrade antibiotics
if self.rif_conc>0:
antibiotic_degradation_rate=param["Antibiotic Degradation Lambda RIF"]
self.rif_conc=self.rif_inoculum*math.exp(-antibiotic_degradation_rate*self.rif_time)
self.rif_time+=1
if self.rif_conc<pow(10, -5): self.rif_conc=0 #To save some time
if self.str_conc>0:
antibiotic_degradation_rate=param["Antibiotic Degradation Lambda STR"]
self.str_conc=self.str_inoculum*math.exp(-antibiotic_degradation_rate*self.str_time)
self.str_time+=1
if self.str_conc<pow(10, -5): self.str_conc=0 #To save some time
if self.quin_conc>0:
antibiotic_degradation_rate=param["Antibiotic Degradation Lambda QUIN"]
self.quin_conc=self.quin_inoculum*math.exp(-antibiotic_degradation_rate*self.quin_time)
self.quin_time+=1
if self.quin_conc<pow(10, -5): self.quin_conc=0 #To save some time
#Degrade phage
def CheckPhageSurvival(phage):
prob_phage_survival=1*math.exp(-param["Free Phage Degradation Lambda"]*(-phage["Time"]))
if random.random()>prob_phage_survival: return False #Doesn't make it
else: return True #Still survives
#Check which phage survive outside
self.free_phages[:]=[p for p in self.free_phages if CheckPhageSurvival(p[1])]
for phage in self.free_phages: phage[1]["Time"]-=1#Then increase counter of surviving phage lifetime outside a host
#Same for exogenous DNA...
if len(self.eDNA)>0:
if random.random()>0.5: #TODO: This more efficiently (as a function of "age" of eDNA?)
self.eDNA.pop(random.randrange(len(self.eDNA)))
class World():
def __init__(self, lines, rows):
self.max_width=lines
self.max_height=rows
self.world={}
self.available_locations=[]
print "Setting world..."
for i in range(0, lines):
for j in range(0, rows):
self.world[(i,j)]=Place((i,j), self)
self.available_locations.append((i,j))
def GetMooreNeighborhood(self, origin, distance):
adjs=[]
for i in range(-distance,distance+1,1):
for j in range(-distance,distance+1,1):
if i != 0 or j != 0: #Do not add self as neighbor
if self.world.has_key((origin[0]+i, origin[1]+j)):
adjs.append((origin[0]+i, origin[1]+j))
return adjs
def Set_Available(self, coordinates):
if coordinates in self.available_locations: print "Bronca SET"; exit()
self.available_locations.append(coordinates)
def Set_Unavailable(self, coordinates):
if not(coordinates in self.available_locations): print "Bronca UNSET"; exit()
self.available_locations.remove(coordinates)
def Get_Free_Space(self):return self.world[random.choice(self.available_locations)]
def Get_All_Free_Spaces(self):return self.available_locations
def Set_All_Unavailable(self):
self.available_locations=[]
def Set_Many_Unavailable(self, coordinate_list):
self.available_locations=list(set(self.available_locations)-set(coordinate_list)) #Careful with this. It changes order of list and removes duplicates. Might work for how we manage this list
def Set_Many_Available(self, coordinate_list):self.available_locations.extend(coordinate_list)
def Set_All_Available(self): self.available_locations=self.world.keys()
def GetAllFreePhages(self):
count_all_phages=0;
for loc in self.world:count_all_phages+=len(self.world[loc].free_phages)
return count_all_phages
def UpdateLocations(self, current_iteration=None):
#Degrade elements (eDNA, antibiotics...)
for _, loc in enumerate(self.world): self.world[loc].Erode_Elements(current_iteration)
#Shuffle according to environmental Structure
if param["Environmental Diffusion"]=="Liquid":
#Redistribute antibiotic randomly across locations
all_rifs=[self.world[loc].rif_conc for loc in self.world]#Gather all individual concentrations in array
random_positions=random.sample(self.world.keys(), len(self.world.keys()))#Randomize locations
for loc in random_positions:self.world[loc].rif_conc=all_rifs.pop()#Assign new antibiotic values
#PHAGE SHUFFLING - randomize each viral particle
all_phages=[] #Remove all phages from positions and collect all in a single list
for loc in self.world:all_phages.extend(copy.copy(self.world[loc].free_phages));self.world[loc].free_phages=[]
factor_chunk=int(round(float(len(all_phages))/(self.max_height*self.max_width)))#This is how many phages each location is going to "receive"
if factor_chunk>=1:#When there are too many phages, divide evenly (faster)
random.shuffle(all_phages)
chunks=[all_phages[x:x+factor_chunk] for x in xrange(0, len(all_phages), factor_chunk)]
for loc in random_positions:
if len(chunks)<=0:break
else:self.world[loc].free_phages=chunks.pop()
while len(chunks)>0: #If there are remainders, distribute randomly
random_pos=random.choice(random_positions)
self.world[random_pos].free_phages.extend(chunks.pop())
else:#If not enough phages, distribute at random by the locations
for loc in random_positions:
if len(all_phages)<=0: break
self.world[loc].free_phages.append(all_phages.pop())
elif param["Environmental Diffusion"]=="Semi-Solid":
distance_to_use=3
#Create dictionary that holds the future (i.e., after diffusion) values (so that it does not chain into more changes than it should per generation)
new_values={}
for loc in self.world:new_values[loc]={"rif":-1,"phage":[]}
#Exchange value of antibiotic with a random neighbour
for loc in self.world:
partner_loc=random.choice(self.world[loc].Get_Reachable(distance=distance_to_use, desired_type="LocationObj"))
new_values[loc]["rif"]=partner_loc.rif_conc;
new_values[partner_loc.coordinates]["rif"]=self.world[loc].rif_conc
if len(self.world[loc].free_phages)>0:
partner_loc=self.world[loc].Get_Reachable(distance=distance_to_use, desired_type="LocationObj")
partner_loc.append(self.world[loc])#Add self as possible recipient
while len(self.world[loc].free_phages)>0:
new_loc=random.choice(partner_loc)
new_values[new_loc.coordinates]["phage"].append(self.world[loc].free_phages.pop())
#After everything is calculated, update values
for loc in self.world:
self.world[loc].rif_conc=new_values[loc]["rif"]
self.world[loc].free_phages=copy.copy(new_values[loc]["phage"])#TODO: Check if deepcopy is needed
from Visual import WorldPainter, PrintGenomes
from Environment import World
import Organism, ParseSpecies
import Stats
import os, string, random
import numpy as np
from copy import copy, deepcopy
import pprint
##Check input parameters
import argparse
parser = argparse.ArgumentParser(description='Phageology')
parser.add_argument('-id', dest='id', help='The identification for this simulation', required=True)
parser.add_argument('-paramfile', dest='paramfile', help='The file with the parameters for this simulation', required=True)
parser.add_argument('-setupfile', dest='setupfile', help='The file with simulation setup', required=True)
parser.add_argument('--visualize', action='store_true', help='Include this flag if you want plots to appear in realtime')
args = parser.parse_args()
print "Parameter file", args.paramfile
print "Setup file", args.setupfile
import Parameters
from Parameters import model_parameters as param
Parameters.LoadParameters(args.paramfile)
#Initialize random machine with seed
if param["Random Seed"]!=None:
random.seed(param["Random Seed"])
np.random.seed(param["Random Seed"])
#Create subdirectories that hold the printed structures
if not os.path.exists(param["Main Directory"]+"/Genomes/"):print("Directory missing ("+param["Main Directory"]+"/Genomes/"+"), creating..."); os.makedirs(param["Main Directory"]+"/Genomes/")
if not os.path.exists(param["Main Directory"]+"/Bacteria/"):print("Directory missing ("+param["Main Directory"]+"/Bacteria/"+"), creating...");os.makedirs(param["Main Directory"]+"/Bacteria/")
if not os.path.exists(param["Main Directory"]+"/World/"):print("Directory missing ("+param["Main Directory"]+"/World/"+"), creating...");os.makedirs(param["Main Directory"]+"/World/")
if not os.path.exists(param["Main Directory"]+"/Joined/"):print("Directory missing ("+param["Main Directory"]+"/Joined/"+"), creating...");os.makedirs(param["Main Directory"]+"/Joined/")
if not os.path.exists(param["Main Directory"]+"/PlotDynamics/"):print("Directory missing ("+param["Main Directory"]+"/PlotDynamics/"+"), creating...");os.makedirs(param["Main Directory"]+"/PlotDynamics/")
#Accessory function - Roullette wheel selection
def WeightedReproductionChoice(bacteria_list):
def weighted_choice(choices):
total = sum(w for c, w in choices)
if total==0: return None
r = random.uniform(0, total)
upto = 0
for c, w in choices:
if upto + w >= r:return (c, w)
upto += w
candidates=[(b, b.AssertCandidacy()) for b in bacteria_list]
candidates = filter(lambda x: x[1] != None, candidates)
result=weighted_choice(candidates)
if (result!=None) and (len(result[0].phages)>0):
for p in result[0].phages:
if p["Type"]=="Virulent": print"You should not be here"; exit()
if result==None: return None
else: return result[0]
#Accessory function - Logic check for bacteria survival
def IsDEAD(bacteria):
if bacteria.location==None:return False
else:return True
def AddPhages(future_phage, world):
for pos in future_phage:
phg=future_phage[pos]
print "Adding phage", phg["Type"], phg["Family"]
world.world[pos].AddFreePhage(phage_type=copy(phg["Type"]),
family=copy(phg["Family"]),
donor_genome=None,
cargo=copy(phg["Cargo"]),
crispr_sequence=copy(phg["crispr_seq"]),
receptor=copy(phg["Receptor"]),
maxcargo=copy(phg["MaxCargoSize"]))
sims=param["Number Simulations"]
for sim in xrange(sims):
#First set up the world
size=param["World Size"]
w=World(lines=size, rows=size)
#Second, let's create a visualizer
vanGogh = WorldPainter(main_location=param["Main Directory"], world_size=size)
#Third, Define bacterial genomes (along with which genes are potential ARG)
def DefineGenome(block):
#Define main DNA
genome_type=[(random.choice([block, block, block, block, "N"]), int(random.uniform(param["Minimum Gene Size"],param["Maximum Gene Size"])), "Core") for _ in xrange(param["Number Genes"])]
#Define antibiotic resistance genes (x3)
for resistance_allele in ["Rif", "Str", "Quin"]:
ar_locus=random.choice([pos for pos in xrange(0,len(genome_type)) if ((genome_type[pos][0]!="N") and (not("AR" in genome_type[pos][0])))])
genome_type[ar_locus]=(genome_type[ar_locus][0]+":AR_"+resistance_allele, genome_type[ar_locus][1], "Core", "Sensitive")
print genome_type
return genome_type
##Get population composition from input file
setup_bacs, setup_phgs, setup_inter, setup_superinfection=ParseSpecies.LoadSpeciesStats(args.setupfile)
##Setup phage
initial_phages=[]
future_phage={}
param["IndividualPhageParameters"]={}
param["IndividualPhageParameters"]["Burst Size"]={}
param["IndividualPhageParameters"]["Lysogeny Alpha"]={}
param["IndividualPhageParameters"]["Lysogeny Kappa"]={}
param["IndividualPhageParameters"]["Induction Alpha"]={}
param["IndividualPhageParameters"]["Induction Kappa"]={}
param["IndividualPhageParameters"]["Generalized Transduction Probability"]={}
param["IndividualPhageParameters"]["Specialized Transduction Probability"]={}
genomes_colors_map={}
for p in setup_phgs:
genomes_colors_map[p["DNAIdentifier"]]=(p["color_genome"], "Phage")
if "ARG" in p["Name"]:
ar_type=p["Name"].split("ARG_")[-1]
initial_phages.append({"Type":p["Type"],
"Position":None,
"Family":p["Name"], "crispr_seq":''.join(random.choice(string.ascii_lowercase) for _ in range(10)),
"Cargo":[(p["DNAIdentifier"], 10, 'Mobile:Ph'),
('A:AR_'+ar_type, 9, 'Core', 'Resistant', 'Mobile:Ph')],
"Receptor":p["Receptor"],
"MaxCargoSize":p["MaxCargoSize"],
"Time":10})
else:
initial_phages.append({"Type":p["Type"],
"Position":None,
"Family":p["Name"], "crispr_seq":''.join(random.choice(string.ascii_lowercase) for _ in range(10)),
"Cargo":[(p["DNAIdentifier"], 10, 'Mobile:Ph'),
(p["DNAIdentifier"], 20, 'Mobile:Ph')],
"Receptor":p["Receptor"],
"MaxCargoSize":p["MaxCargoSize"],
"Time":0})
param["IndividualPhageParameters"]["Burst Size"][p["Name"]]=p["BurstSize"]
param["IndividualPhageParameters"]["Lysogeny Alpha"][p["Name"]]=p["Lysogeny Alpha"]
param["IndividualPhageParameters"]["Lysogeny Kappa"][p["Name"]]=p["Lysogeny Kappa"]
param["IndividualPhageParameters"]["Induction Alpha"][p["Name"]]=p["Induction Alpha"]
param["IndividualPhageParameters"]["Induction Kappa"][p["Name"]]=p["Induction Kappa"]
param["IndividualPhageParameters"]["Generalized Transduction Probability"][p["Name"]]=p["Generalized Transduction Probability"]
param["IndividualPhageParameters"]["Specialized Transduction Probability"][p["Name"]]=p["Specialized Transduction Probability"]
for _ in range(0, p["FreeNumbers"]):
#Clustered in the middle of the environment
#pos=(int(random.uniform(int(param["World Size"]/2)-5, int(param["World Size"]/2)+5)),int(random.uniform(int(param["World Size"]/2)-5, int(param["World Size"]/2)+5)))
#OR Randomly sparsed throughout the environment
pos=(int(random.uniform(0, param["World Size"])), int(random.uniform(0, param["World Size"])))
#Save to add at a later point
future_phage[pos]=initial_phages[-1]
##Setup the infection map (host range)
param["InfectionMap"]=setup_inter
##Setup the superinfection protection amongst phage
param["Superinfection"]=setup_superinfection
##Setup bacteria
genome_types=[];colors_used=[]; bacs=[]
param["Bacteria Species"]={}
param["IndividualBacteriaParameters"]={}
param["IndividualBacteriaParameters"]["Basal Reproduction Probability"]={}
for b in setup_bacs:
if not(b["Color"] in colors_used): colors_used.append(b["Color"]); genome_types.append((DefineGenome(b["DNAIdentifier"]), b["Color"])); param["Bacteria Species"][b["Name"]]=b["Color"]; genomes_colors_map[b["DNAIdentifier"]]=(b["Color"], "Bacteria")
genome_to_use=None
for gen in genome_types:
if gen[1]==b["Color"]:
genome_to_use=gen[0]
param["IndividualBacteriaParameters"]["Basal Reproduction Probability"][b["Name"]]=b["Growth"]
for b_individual in range(0, int(param["World Size"]*param["World Size"]*b["Freq"])):
if b["Color"]==None: print b_individual, b
new_bacteria=Organism.Bacteria(b["Name"], copy(genome_to_use), b["Color"], ar_locus=[0,0,0], movement=1)
if b["AntRes"]!="NA":
for res in b["AntRes"]:
for idx, gene in enumerate(new_bacteria.genome):
if "AR_"+res in gene[0]:new_bacteria.genome[idx]=(gene[0], gene[1], gene[2], "Resistant"); new_bacteria.resistances[gene[0].split(":")[1]]=True
if b["PhageRes"][0]!="NA": new_bacteria.phage_receptor_resistances=[int(recept) for recept in b["PhageRes"]]
if b["Prophage"]!=None:
for phg in b["Prophage"]:
for phg_template in initial_phages:
if phg_template["Family"]==phg[0]:
#print arg_pos
new_bacteria.phages.append({"Type":copy(phg_template["Type"]),
"Position":copy(phg[1]),
"Family":copy(phg_template["Family"]),
"crispr_seq":copy(phg_template["crispr_seq"]),
"Cargo":copy(phg_template["Cargo"]),
"Receptor":copy(phg_template["Receptor"]),
"MaxCargoSize":copy(phg_template["MaxCargoSize"]),
"Time":copy(phg_template["Time"])})
new_bacteria.genome.insert(phg[1], new_bacteria.phages[-1])
for gene in phg_template["Cargo"]:
#Check if ARG comes along, add to resistance profile...
if ("AR" in gene[0]) and (gene[3]=="Resistant"):
new_bacteria.resistances[gene[0].split(":")[1]]=True
bacs.append(new_bacteria)
print "Setup done. Here are the parameters to be simulated"
pprint.pprint(param)
print "Positioning bacteria..."
print len(bacs)
all_vacancies=deepcopy(w.Get_All_Free_Spaces()) #Gather empty spots (deepcopy, because we are changing it)
random.shuffle(all_vacancies);random.shuffle(bacs) #Shuffle everything (places, bacteria)
for idx in range(len(bacs)):bacs[idx].Set_Location(w.world[all_vacancies[idx]], set_un=False) #Assign each bacteria to a place (sequential, now that everything is shuffled)
w.Set_Many_Unavailable(all_vacancies[0:len(bacs)]) #Mark assigned places as unavailable (the first part of the location list, since we assigned them sequentially)
#w.Set_All_Unavailable() #Comment previous line and uncomment this if all lattice is filled (faster like this)
#Fifth, and finally the life cycle
iterations=param["Iterations"]
iterations_outgrowth=param["Iterations Outgrowth"]
#Real time plots
if args.visualize: import VisualStats; VisualStats.LaunchPlot(); VisualStats.ResetPlots()
phage_diversity={}
bac_diversity={}
####PART 1: Getting the transducing particles
for iteration in range(-iterations_outgrowth,iterations):
#Add phages if outgrowth is finished
if iteration==0:
print "Adding phages to the environment..."
AddPhages(future_phage, w)