Source code for opqua.internal.gillespie


"""Contains class Population."""

import copy as cp
import pandas as pd
import numpy as np
import joblib as jl

from opqua.model import *

[docs] class Gillespie(object): """Class contains methods for simulating model with Gillespie algorithm. Class defines a model's events and methods for changing system state according to the possible events and simulating a timecourse using the Gillespie algorithm. Attributes: model (Model object): the model this simulation belongs to. """ # Event ID constants: MIGRATE_HOST = 0 MIGRATE_VECTOR = 1 POPULATION_CONTACT_HOST_HOST = 2 POPULATION_CONTACT_HOST_VECTOR = 3 POPULATION_CONTACT_VECTOR_HOST = 4 CONTACT_HOST_HOST = 5 CONTACT_HOST_VECTOR = 6 CONTACT_VECTOR_HOST = 7 RECOVER_HOST = 8 RECOVER_VECTOR = 9 MUTATE_HOST = 10 MUTATE_VECTOR = 11 RECOMBINE_HOST = 12 RECOMBINE_VECTOR = 13 KILL_HOST = 14 KILL_VECTOR = 15 DIE_HOST = 16 DIE_VECTOR = 17 BIRTH_HOST = 18 BIRTH_VECTOR = 19 EVENT_IDS = { # must match above 0:'MIGRATE_HOST', 1:'MIGRATE_VECTOR', 2:'POPULATION_CONTACT_HOST_HOST', 3:'POPULATION_CONTACT_HOST_VECTOR', 4:'POPULATION_CONTACT_VECTOR_HOST', 5:'CONTACT_HOST_HOST', 6:'CONTACT_HOST_VECTOR', 7:'CONTACT_VECTOR_HOST', 8:'RECOVER_HOST', 9:'RECOVER_VECTOR', 10:'MUTATE_HOST', 11:'MUTATE_VECTOR', 12:'RECOMBINE_HOST', 13:'RECOMBINE_VECTOR', 14:'KILL_HOST', 15:'KILL_VECTOR', 16:'DIE_HOST', 17:'DIE_VECTOR', 18:'BIRTH_HOST', 19:'BIRTH_VECTOR' } def __init__(self, model): """Create a new Gillespie simulation object. Arguments: model (Model object): the model this simulation belongs to. """ super(Gillespie, self).__init__() # initialize as parent class object # Event IDs self.evt_IDs = [ self.MIGRATE_HOST, self.MIGRATE_VECTOR, self.POPULATION_CONTACT_HOST_HOST, self.POPULATION_CONTACT_HOST_VECTOR, self.POPULATION_CONTACT_VECTOR_HOST, self.CONTACT_HOST_HOST, self.CONTACT_HOST_VECTOR, self.CONTACT_VECTOR_HOST, self.RECOVER_HOST, self.RECOVER_VECTOR, self.MUTATE_HOST, self.MUTATE_VECTOR, self.RECOMBINE_HOST, self.RECOMBINE_VECTOR, self.KILL_HOST, self.KILL_VECTOR, self.DIE_HOST, self.DIE_VECTOR, self.BIRTH_HOST, self.BIRTH_VECTOR ] # event IDs in specific order to be used self.total_population_contact_rate_host = 0 self.total_population_contact_rate_vector = 0 self.model = model
[docs] def getRates(self,population_ids): """Wrapper for calculating event rates as per current system state. Arguments: population_ids (list of Strings): list with IDs for every population in the model. Returns: Matrix with rates as values for events (rows) and populations (columns). Populations in order given in argument. """ rates = np.zeros( [ len(self.evt_IDs), len(population_ids) ] ) # rate array size of event space # Contact rates assume scaling area: large populations are equally # dense as small ones, so contact is constant with both host and # vector populations. If you don't want this to happen, modify each # population's base contact rate accordingly. # Now for each population... for i,id in enumerate(population_ids): # Calculate the rates: rates[self.MIGRATE_HOST,i] = ( self.model.populations[id].total_migration_rate_hosts * self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].MIGRATION ].sum() ) rates[self.MIGRATE_VECTOR,i] = ( self.model.populations[id].total_migration_rate_vectors * self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].MIGRATION ].sum() ) rates[self.POPULATION_CONTACT_HOST_HOST,i] = ( np.sum([ list( self.model.populations[id].neighbors_contact_hosts_hosts.values() ) ]) * np.multiply( self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].POPULATION_CONTACT ], self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].INFECTED ] ).sum() / max( len( self.model.populations[id].hosts ), 1) * np.sum([ neighbor.neighbors_contact_hosts_hosts[self.model.populations[id]] * neighbor.coefficients_hosts[ :, neighbor.RECEIVE_POPULATION_CONTACT ].sum() for neighbor in self.model.populations[id].neighbors_contact_hosts_hosts ]) ) rates[self.POPULATION_CONTACT_HOST_VECTOR,i] = ( np.sum([ list( self.model.populations[id].neighbors_contact_hosts_vectors.values() ) ]) * np.multiply( self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].POPULATION_CONTACT ], self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].INFECTED ] ).sum() / max( len( self.model.populations[id].hosts ), 1) * np.sum([ neighbor.neighbors_contact_vectors_hosts[self.model.populations[id]] * neighbor.coefficients_vectors[ :, neighbor.RECEIVE_POPULATION_CONTACT ].sum() for neighbor in self.model.populations[id].neighbors_contact_hosts_vectors ]) ) rates[self.POPULATION_CONTACT_VECTOR_HOST,i] = ( np.sum([ list( self.model.populations[id].neighbors_contact_vectors_hosts.values() ) ]) * np.multiply( self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].POPULATION_CONTACT ], self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].INFECTED ] ).sum() * np.sum([ neighbor.neighbors_contact_hosts_vectors[self.model.populations[id]] * neighbor.coefficients_hosts[ :, neighbor.RECEIVE_POPULATION_CONTACT ].sum() / max( len( neighbor.hosts ), 1) for neighbor in self.model.populations[id].neighbors_contact_vectors_hosts ]) ) rates[self.CONTACT_HOST_HOST,i] = ( self.model.populations[id].contact_rate_host_host * np.multiply( self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].CONTACT ], self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].INFECTED ] ).sum() * self.model.populations[id].transmission_efficiency_host_host * np.sum( self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].RECEIVE_CONTACT ] ) / max( len( self.model.populations[id].hosts ), 1) ) rates[self.CONTACT_HOST_VECTOR,i] = ( self.model.populations[id].contact_rate_host_vector * np.multiply( self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].CONTACT ], self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].INFECTED ] ).sum() * self.model.populations[id].transmission_efficiency_host_vector * np.sum( self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].RECEIVE_CONTACT ] ) / max( len( self.model.populations[id].hosts ), 1) ) rates[self.CONTACT_VECTOR_HOST,i] = ( self.model.populations[id].contact_rate_host_vector * np.multiply( self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].CONTACT ], self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].INFECTED ] ).sum() * self.model.populations[id].transmission_efficiency_vector_host * np.sum( self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].RECEIVE_CONTACT ]) / max( len( self.model.populations[id].hosts ), 1) ) rates[self.RECOVER_HOST,i] = ( self.model.populations[id].recovery_rate_host * self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].RECOVERY ].sum() ) rates[self.RECOVER_VECTOR,i] = ( self.model.populations[id].recovery_rate_vector * self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].RECOVERY ].sum() ) rates[self.MUTATE_HOST,i] = ( self.model.populations[id].mutate_in_host * self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].MUTATION ].sum() ) rates[self.MUTATE_VECTOR,i] = ( self.model.populations[id].mutate_in_vector * self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].MUTATION ].sum() ) rates[self.RECOMBINE_HOST,i] = ( self.model.populations[id].recombine_in_host * self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].RECOMBINATION ].sum() ) rates[self.RECOMBINE_VECTOR,i] = ( self.model.populations[id].recombine_in_vector * self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].RECOMBINATION ].sum() ) rates[self.KILL_HOST,i] = ( self.model.populations[id].mortality_rate_host * self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].LETHALITY ].sum() ) rates[self.KILL_VECTOR,i] = ( self.model.populations[id].mortality_rate_vector * self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].LETHALITY ].sum() ) rates[self.DIE_HOST,i] = ( self.model.populations[id].death_rate_host * len(self.model.populations[id].hosts) ) rates[self.DIE_VECTOR,i] = ( self.model.populations[id].death_rate_vector * len(self.model.populations[id].vectors) ) rates[self.BIRTH_HOST,i] = ( self.model.populations[id].birth_rate_host * self.model.populations[id].coefficients_hosts[ :, self.model.populations[id].NATALITY ].sum() ) rates[self.BIRTH_VECTOR,i] = ( self.model.populations[id].birth_rate_vector * self.model.populations[id].coefficients_vectors[ :, self.model.populations[id].NATALITY ].sum() ) return rates
[docs] def doAction(self,act,pop,rand): """Change system state according to act argument passed Arguments: act (int): defines action to be taken, one of the event ID constants. pop (Population object): where the population action will happen in. rand (number 0-1): random number used to define event. Returns: Boolean indicationg whether or not the model has changed state. """ changed = False if act == self.MIGRATE_HOST: rand = rand * pop.total_migration_rate_hosts r_cum = 0 for neighbor in pop.neighbors_hosts: r_cum += pop.neighbors_hosts[neighbor] if r_cum > rand: pop.migrate(neighbor,1,0, rand=( ( rand - r_cum + pop.neighbors_hosts[neighbor] ) / pop.neighbors_hosts[neighbor] ) ) changed = True break elif act == self.MIGRATE_VECTOR: rand = rand * pop.total_migration_rate_vectors r_cum = 0 for neighbor in pop.neighbors_vectors: r_cum += pop.neighbors_vectors[neighbor] if r_cum > rand: pop.migrate(neighbor,0,1, rand=( ( rand - r_cum + pop.neighbors_vectors[neighbor] ) / pop.neighbors_vectors[neighbor] ) ) changed = True break elif act == self.POPULATION_CONTACT_HOST_HOST: neighbors = list( pop.neighbors_contact_hosts_hosts.keys() ) neighbor_rates = [ pop.neighbors_contact_hosts_hosts[neighbor] * neighbor.neighbors_contact_hosts_hosts[pop] * neighbor.coefficients_hosts[ :, neighbor.RECEIVE_POPULATION_CONTACT ].sum() / max( len( pop.hosts ), 1) # technically unnecessary for neighbor in neighbors ] rand = rand * np.sum(neighbor_rates) r_cum = 0 for neighbor_i,neighbor_r in enumerate(neighbor_rates): r_cum += neighbor_r if r_cum > rand: changed = pop.populationContact( neighbors[neighbor_i], ( rand - r_cum + neighbor_r ) / neighbor_r, host_origin=True, host_target=True ) break elif act == self.POPULATION_CONTACT_HOST_VECTOR: neighbors = list( pop.neighbors_contact_hosts_vectors.keys() ) neighbor_rates = [ pop.neighbors_contact_hosts_vectors[neighbor] * neighbor.neighbors_contact_vectors_hosts[pop] * neighbor.coefficients_vectors[ :, neighbor.RECEIVE_POPULATION_CONTACT ].sum() / max( len( pop.hosts ), 1) # technically unnecessary for neighbor in neighbors ] rand = rand * np.sum(neighbor_rates) r_cum = 0 for neighbor_i,neighbor_r in enumerate(neighbor_rates): r_cum += neighbor_r if r_cum > rand: changed = pop.populationContact( neighbors[neighbor_i], ( rand - r_cum + neighbor_r ) / neighbor_r, host_origin=True, host_target=False ) break elif act == self.POPULATION_CONTACT_VECTOR_HOST: neighbors = list( pop.neighbors_contact_vectors_hosts.keys() ) neighbor_rates = [ pop.neighbors_contact_vectors_hosts[neighbor] * neighbor.neighbors_contact_hosts_vectors[pop] * neighbor.coefficients_hosts[ :, neighbor.RECEIVE_POPULATION_CONTACT ].sum() / max( len( neighbor.hosts ), 1) # necessary! for neighbor in neighbors ] rand = rand * np.sum(neighbor_rates) r_cum = 0 for neighbor_i,neighbor_r in enumerate(neighbor_rates): r_cum += neighbor_r if r_cum > rand: changed = pop.populationContact( neighbors[neighbor_i], ( rand - r_cum + neighbor_r ) / neighbor_r, host_origin=False, host_target=True ) break elif act == self.CONTACT_HOST_HOST: changed = pop.contactHostHost(rand) elif act == self.CONTACT_HOST_VECTOR: changed = pop.contactHostVector(rand) elif act == self.CONTACT_VECTOR_HOST: changed = pop.contactVectorHost(rand) elif act == self.RECOVER_HOST: pop.recoverHost(rand) changed = True elif act == self.RECOVER_VECTOR: pop.recoverVector(rand) changed = True elif act == self.MUTATE_HOST: pop.mutateHost(rand) changed = True elif act == self.MUTATE_VECTOR: pop.mutateVector(rand) changed = True elif act == self.RECOMBINE_HOST: pop.recombineHost(rand) changed = True elif act == self.RECOMBINE_VECTOR: pop.recombineVector(rand) changed = True elif act == self.KILL_HOST: pop.killHost(rand) changed = True elif act == self.KILL_VECTOR: pop.killVector(rand) changed = True elif act == self.DIE_HOST: pop.dieHost(rand) changed = True elif act == self.DIE_VECTOR: pop.dieVector(rand) changed = True elif act == self.BIRTH_HOST: pop.birthHost(rand) changed = True elif act == self.BIRTH_VECTOR: pop.birthVector(rand) changed = True self.model.global_trackers['num_events'][self.EVENT_IDS[act]] += 1 return changed
[docs] def run(self,t0,tf,time_sampling=0,host_sampling=0,vector_sampling=0, print_every_n_events=1000): """Simulate model for a specified time between two time points. Simulates a time series using the Gillespie algorithm. Arguments: t0 (number): initial time point to start simulation at. tf (number): initial time point to end simulation at. Keyword arguments: time_sampling (int): how many events to skip before saving a snapshot of the system state (saves all by default), if <0, saves only final state. Defaults to 0. host_sampling (int): how many hosts to skip before saving one in a snapshot of the system state (saves all by default). Defaults to 0. vector_sampling (int): how many vectors to skip before saving one in a snapshot of the system state (saves all by default). Defaults to 0. print_every_n_events (int>0): number of events a message is printed to console. Defaults to 1000. Returns: dictionary containing model state history, with `keys`=`times` and `values`=`Model` objects with model snapshot at that time point. """ # Simulation variables self.model.t_var = t0 # keeps track of time history = { 0: self.model.copyState( host_sampling=host_sampling, vector_sampling=vector_sampling ) } intervention_tracker = 0 # keeps track of what the next intervention should be self.model.interventions = sorted( self.model.interventions, key=lambda i: i.time ) print_counter = 0 # only used to track when to print sampling_counter = 0 # used to track when to save a snapshot while self.model.t_var < tf: # repeat until t reaches end of timecourse population_ids = list( self.model.populations.keys() ) r = self.getRates(population_ids) # get event rates in this state r_tot = np.sum(r) # sum of all rates # Time handling if r_tot > 0: dt = np.random.exponential( 1/r_tot ) # time until next event if (intervention_tracker < len(self.model.interventions) and self.model.t_var + dt >= self.model.interventions[intervention_tracker].time): # if there are any interventions left and if it is time # to make one, while ( intervention_tracker < len(self.model.interventions) and (self.model.t_var + dt >= self.model.interventions[intervention_tracker].time or r_tot == 0) and self.model.t_var < tf ): # carry out all interventions at this time point, # and additional timepoints if no events will happen self.model.t_var = self.model.interventions[ intervention_tracker ].time self.model.interventions[ intervention_tracker ].doIntervention() intervention_tracker += 1 # advance the tracker # save snapshot at this timepoint sampling_counter = 0 history[self.model.t_var] = self.model.copyState() # now recalculate rates population_ids = list( self.model.populations.keys() ) r = self.getRates(population_ids) # get event rates in this state r_tot = np.sum(r) # sum of all rates if r_tot > 0: # if no more events happening, dt = np.random.exponential( 1/r_tot ) # time until next event else: self.model.t_var = tf # go to end self.model.t_var += dt # add time step to main timer # Event handling if self.model.t_var < tf: # if still within max time u = np.random.random() * r_tot # random uniform number between 0 and total rate r_cum = 0 # cumulative rate for e in range(r.shape[0]): # for every possible event, for p in range(r.shape[1]): # for every possible population, r_cum += r[e,p] # add this event's rate to cumulative rate if u < r_cum: # if random number is under cumulative rate # print every n events print_counter += 1 if print_counter == print_every_n_events: print_counter = 0 print( 'Simulating time: ' + str(self.model.t_var) + ', event: ' + self.EVENT_IDS[e] ) changed = self.doAction( e, self.model.populations[ population_ids[p] ], ( u - r_cum + r[e,p] ) / r[e,p] ) # do corresponding action, # feed in renormalized random number if changed: # if model state changed # update custom condition trackers for condition in self.model.custom_condition_trackers: if self.model.custom_condition_trackers[condition](self.model): self.model.global_trackers['custom_conditions'][condition].append(self.model.t_var) if time_sampling >= 0: # if state changed and saving history, # saves history at correct intervals sampling_counter += 1 if sampling_counter > time_sampling: sampling_counter = 0 history[self.model.t_var] = self.model.copyState( host_sampling=host_sampling, vector_sampling=vector_sampling ) break # exit event loop else: # if the inner loop wasn't broken, continue # continue outer loop break # otherwise, break outer loop else: # if no events happening, if intervention_tracker < len(self.model.interventions): # if still not done with interventions, while (intervention_tracker < len(self.model.interventions) and self.model.t_var <= self.model.interventions[intervention_tracker].time and self.model.t_var < tf): # carry out all interventions at this time point self.model.t_var = self.model.interventions[ intervention_tracker ].time self.model.interventions[ intervention_tracker ].doIntervention() intervention_tracker += 1 # advance the tracker else: self.model.t_var = tf print( 'Simulating time: ' + str(self.model.t_var), 'END') history[tf] = self.model.copyState( host_sampling=host_sampling, vector_sampling=vector_sampling ) history[tf].history = None history[tf].global_trackers = cp.deepcopy( self.model.global_trackers ) return history