import numpy as np import sys import graphUtils statesComp = ["S", "I", "R"] rulesComp = [("I", "R", 1.0), # spontaneous rule I -> R with rate 1.0 ("R", "S", 0.7), # spontaneous rule R -> S with rate 0.7 (("I","S"),("I","I"), 0.8)] # contact rule I+S -> I+I with rate 0.4 simulation = [] class Rule: def __init__(self, ruleParts, probability): self.ruleParts = ruleParts self.probability = probability def getString(self): output = "(" for part in self.ruleParts: output += f"({part.getFromState().getValue()}, {part.getToState().getValue()}), " output = output[0:len(output)-2] output += f"), {self.probability})" return output def getOutput(self): # index of to state is ruleParts / 2 toStateIndex = int(len(self.ruleParts) / 2) fromTuple = [] toTuple = [] for i in range(toStateIndex): fromTuple.append(self.ruleParts[i]) toTuple.append(self.ruleParts[i + toStateIndex]) if(len(fromTuple) > 1): fromTuple = tuple(fromTuple) toTuple = tuple(toTuple) return (fromTuple, toTuple, self.probability,) else: return (fromTuple[0], toTuple[0], self.probability,) def parseState(string): output = "" i = 0 while(not(string[i].isdigit() or string[i] == " ")): output += string[i] i += 1 return (output, string[i+1:],) def parseRate(string): output = "" i = 0 while(not string[i] == " "): output += string[i] i += 1 if(i == len(string)): break return(float(output), string[i+1:]) # Extract rules from string # example: "Inf Inf Inf R 0.8" def stringToRule(Input): allRules = [] rulePartsBuffer = [] while(len(Input) > 0): newString = parseState(Input) rulePartsBuffer.append(newString[0]) # check if we are at the end if(newString[1][0].isdigit()): newString = parseRate(newString[1]) allRules.append(Rule(rulePartsBuffer.copy(), newString[0])) rulePartsBuffer.clear() Input = newString[1] output = [] for r in allRules: output.append(r.getOutput()) return output def stringToStates(inp): inp = inp.split(" ") out = [] for s in inp: out.append(s) return out def stringToDistr(inp): inp = inp.split(" ") out = [] for i in inp: out.append(float(i)) return out # parse the input wohooooo rules = stringToRule(sys.argv[1]) states = stringToStates(sys.argv[2]) initial_distribution = stringToDistr(sys.argv[3]) graph_as_edgelist = [(0, 4), (0, 1), (1, 5), (1, 2), (2, 6), (2, 3), (3, 7), (4, 8), (4, 5), (5, 9), (5, 6), (6, 10), (6, 7), (7, 11), (8, 12), (8, 9), (9, 13), (9, 10), (10, 14), (10, 11), (11, 15), (12, 13), (13, 14), (14, 15)] print(graphUtils.edgelistToDot("testGraph", graph_as_edgelist)) print(graphUtils.dotToEdgelist("strict graph 'testGraph' {\n0 -- 4;\n0 -- 1;\n1 -- 5;\n}")[0]) horizon = 20.0 # wie lange wird simuliert # initial_distribution = [0.5, 0.5, 0.0] # gleiche Reihenfolge wie states, musss zu rules passen und normalisiert werden timepoint_num = 101 def get_next_state(current_labels): fastes_firing_time = 10000000.0 #dummy firing_rule = None firing_node = None firing_edge = None # iterate over nodes for node in nodes: current_state = current_labels[node] for rule in rules: if 'tuple' in str(type(rule[0])): # is contact rule continue if current_state == rule[0]: current_fireing_time = np.random.exponential(1.0/rule[2]) if current_fireing_time < fastes_firing_time: fastes_firing_time = current_fireing_time firing_rule = rule firing_node = node firing_edge = None # iterate over edges: for edge in graph_as_edgelist: node1, node2 = edge current_state1 = current_labels[node1] current_state2 = current_labels[node2] for rule in rules: if 'str' in str(type(rule[0])): # is spont. rule continue if (current_state1 == rule[0][0] and current_state2 == rule[0][1]) or (current_state2 == rule[0][0] and current_state1 == rule[0][1]): current_fireing_time = np.random.exponential(1.0/rule[2]) if current_fireing_time < fastes_firing_time: fastes_firing_time = current_fireing_time firing_rule = rule firing_node = None firing_edge = edge if firing_rule is None: # no rule could fire return None, fastes_firing_time # would happen anyway but still # apply rule new_labels = list(current_labels) # copy if firing_node is not None: new_labels[firing_node] = firing_rule[1] return new_labels, fastes_firing_time assert(firing_edge is not None) change_node1 = firing_edge[0] change_node2 = firing_edge[1] # we have to check which node changes in which direction if new_labels[change_node1] == firing_rule[0][0] and new_labels[change_node2] == firing_rule[0][1]: new_labels[change_node1] = firing_rule[1][0] new_labels[change_node2] = firing_rule[1][1] else: new_labels[change_node1] = firing_rule[1][1] new_labels[change_node2] = firing_rule[1][0] return new_labels, fastes_firing_time def count_states(current_labels): counter = [0 for _ in states] simulation.append(current_labels) for label in current_labels: index = states.index(label) counter[index] += 1 return counter nodes = sorted(list(set([e[0] for e in graph_as_edgelist] + [e[1] for e in graph_as_edgelist]))) assert(nodes == list(range(len(nodes)))) # nodes haben labels 0... # setup timepoints_samples = np.linspace(0.0, horizon, timepoint_num) timepoints_samples_static = np.linspace(0.0, horizon, timepoint_num) initial_labels = list(np.random.choice(states, len(nodes), p=initial_distribution)) current_labels = initial_labels global_clock = 0.0 labels = list() timepoints = list() state_counts = list() # simulate while len(timepoints_samples) > 0: new_labels, time_passed = get_next_state(current_labels) global_clock += time_passed while len(timepoints_samples) > 0 and global_clock > timepoints_samples[0]: labels.append(list(current_labels)) state_counts.append(count_states(current_labels)) timepoints_samples = timepoints_samples[1:] current_labels = new_labels for l in simulation: print(l) pass if(rulesComp == rules): print("yess")