From f1bce8dc4ed4f24e1b25c4f8745418961a97b2dc Mon Sep 17 00:00:00 2001 From: JuliusHerrmann Date: Thu, 15 Jul 2021 04:09:46 +0200 Subject: [PATCH] advanced simulation rule parsing added --- python/.vscode/launch.json | 16 +++ python/simulationAdvanced.py | 206 +++++++++++++++++++++++++++++++++ python/test.py | 109 +++++++++++++++++ visualizer/public/css/main.css | 4 + 4 files changed, 335 insertions(+) create mode 100644 python/.vscode/launch.json create mode 100644 python/simulationAdvanced.py create mode 100644 python/test.py diff --git a/python/.vscode/launch.json b/python/.vscode/launch.json new file mode 100644 index 0000000..73c2f4a --- /dev/null +++ b/python/.vscode/launch.json @@ -0,0 +1,16 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "args": ["I R 1.0 R S 0.7 I S I I 0.8"] + } + ] +} \ No newline at end of file diff --git a/python/simulationAdvanced.py b/python/simulationAdvanced.py new file mode 100644 index 0000000..1acff8a --- /dev/null +++ b/python/simulationAdvanced.py @@ -0,0 +1,206 @@ +import numpy as np +import sys +statesComp = ["S", "I", "R"] +rulesComp = [("I", "R", 1.0), # spontaneous rule I -> R with rate 1.0 + ("R", "S", 0.7), # spontaneous rule R -> S with rate 0.7 + (("I","S"),("I","I"), 0.8)] # contact rule I+S -> I+I with rate 0.4 + +allStates = [] + +class Rule: + def __init__(self, ruleParts, probability): + self.ruleParts = ruleParts + self.probability = probability + + def getString(self): + output = "(" + for part in self.ruleParts: + output += f"({part.getFromState().getValue()}, {part.getToState().getValue()}), " + output = output[0:len(output)-2] + output += f"), {self.probability})" + return output + + def getOutput(self): + # index of to state is ruleParts / 2 + toStateIndex = int(len(self.ruleParts) / 2) + fromTuple = [] + toTuple = [] + for i in range(toStateIndex): + fromTuple.append(self.ruleParts[i]) + toTuple.append(self.ruleParts[i + toStateIndex]) + + if(len(fromTuple) > 1): + fromTuple = tuple(fromTuple) + toTuple = tuple(toTuple) + return (fromTuple, toTuple, self.probability,) + else: + return (fromTuple[0], toTuple[0], self.probability,) + + + +def parseState(string): + output = "" + i = 0 + while(not(string[i].isdigit() or string[i] == " ")): + output += string[i] + i += 1 + return (output, string[i+1:],) + +def parseRate(string): + output = "" + i = 0 + while(not string[i] == " "): + output += string[i] + i += 1 + if(i == len(string)): + break + return(float(output), string[i+1:]) + + +# Extract rules from string +# example: "Inf Inf Inf R 0.8" +def stringToRule(Input): + allRules = [] + rulePartsBuffer = [] + while(len(Input) > 0): + newString = parseState(Input) + rulePartsBuffer.append(newString[0]) + allStates.append(newString[0]) + + # check if we are at the end + if(newString[1][0].isdigit()): + newString = parseRate(newString[1]) + allRules.append(Rule(rulePartsBuffer.copy(), newString[0])) + rulePartsBuffer.clear() + + Input = newString[1] + + output = [] + for r in allRules: + output.append(r.getOutput()) + return output + +def generateStatesList(): + out = [] + for s in allStates: + if(not(s in out)): + out.append(s) + return out + + +# parse the input wohooooo +rules = stringToRule(sys.argv[1]) +states = generateStatesList() + +simulation = [] + + +graph_as_edgelist = [(0, 4), (0, 1), (1, 5), (1, 2), (2, 6), (2, 3), (3, 7), (4, 8), (4, 5), (5, 9), (5, 6), (6, 10), (6, 7), (7, 11), (8, 12), (8, 9), (9, 13), (9, 10), (10, 14), (10, 11), (11, 15), (12, 13), (13, 14), (14, 15)] + +horizon = 20.0 # wie lange wird simuliert +initial_distribution = [0.5, 0.5, 0.0] # gleiche Reihenfolge wie states, musss zu rules passen und normalisiert werden +timepoint_num = 101 +def get_next_state(current_labels): + fastes_firing_time = 10000000.0 #dummy + firing_rule = None + firing_node = None + firing_edge = None + + # iterate over nodes + for node in nodes: + current_state = current_labels[node] + for rule in rules: + if 'tuple' in str(type(rule[0])): + # is contact rule + continue + if current_state == rule[0]: + current_fireing_time = np.random.exponential(1.0/rule[2]) + if current_fireing_time < fastes_firing_time: + fastes_firing_time = current_fireing_time + firing_rule = rule + firing_node = node + firing_edge = None + + + # iterate over edges: + for edge in graph_as_edgelist: + node1, node2 = edge + current_state1 = current_labels[node1] + current_state2 = current_labels[node2] + for rule in rules: + if 'str' in str(type(rule[0])): + # is spont. rule + continue + if (current_state1 == rule[0][0] and current_state2 == rule[0][1]) or (current_state2 == rule[0][0] and current_state1 == rule[0][1]): + current_fireing_time = np.random.exponential(1.0/rule[2]) + if current_fireing_time < fastes_firing_time: + fastes_firing_time = current_fireing_time + firing_rule = rule + firing_node = None + firing_edge = edge + + + if firing_rule is None: + # no rule could fire + return None, fastes_firing_time # would happen anyway but still + + # apply rule + new_labels = list(current_labels) # copy + + if firing_node is not None: + new_labels[firing_node] = firing_rule[1] + return new_labels, fastes_firing_time + + assert(firing_edge is not None) + change_node1 = firing_edge[0] + change_node2 = firing_edge[1] + # we have to check which node changes in which direction + if new_labels[change_node1] == firing_rule[0][0] and new_labels[change_node2] == firing_rule[0][1]: + new_labels[change_node1] = firing_rule[1][0] + new_labels[change_node2] = firing_rule[1][1] + else: + new_labels[change_node1] = firing_rule[1][1] + new_labels[change_node2] = firing_rule[1][0] + + + return new_labels, fastes_firing_time + +def count_states(current_labels): + counter = [0 for _ in states] + simulation.append(current_labels) + for label in current_labels: + index = states.index(label) + counter[index] += 1 + return counter + +nodes = sorted(list(set([e[0] for e in graph_as_edgelist] + [e[1] for e in graph_as_edgelist]))) +assert(nodes == list(range(len(nodes)))) # nodes haben labels 0... + +# setup +timepoints_samples = np.linspace(0.0, horizon, timepoint_num) +timepoints_samples_static = np.linspace(0.0, horizon, timepoint_num) +initial_labels = list(np.random.choice(states, len(nodes), p=initial_distribution)) +current_labels = initial_labels +global_clock = 0.0 +labels = list() +timepoints = list() +state_counts = list() + + +# simulate +while len(timepoints_samples) > 0: + new_labels, time_passed = get_next_state(current_labels) + global_clock += time_passed + while len(timepoints_samples) > 0 and global_clock > timepoints_samples[0]: + labels.append(list(current_labels)) + state_counts.append(count_states(current_labels)) + timepoints_samples = timepoints_samples[1:] + current_labels = new_labels + + +for l in simulation: + print(l) + pass + +if(rulesComp == rules): + print("yess") diff --git a/python/test.py b/python/test.py new file mode 100644 index 0000000..c8fec72 --- /dev/null +++ b/python/test.py @@ -0,0 +1,109 @@ +import numpy as np + +allStates = [] +states = ["S", "I", "R"] +rules = [("I", "R", 1.0), # spontaneous rule I -> R with rate 1.0 + ("R", "S", 0.7), # spontaneous rule R -> S with rate 0.7 + (("I","S"),("I","I"), 0.8)] # contact rule I+S -> I+I with rate 0.4 +graph_as_edgelist = [(0, 4), (0, 1), (1, 5), (1, 2), (2, 6), (2, 3), (3, 7), (4, 8), (4, 5), (5, 9), (5, 6), (6, 10), (6, 7), (7, 11), (8, 12), (8, 9), (9, 13), (9, 10), (10, 14), (10, 11), (11, 15), (12, 13), (13, 14), (14, 15)] +horizon = 20.0 # wie lange wird simuliert +initial_distribution = [0.5, 0.5, 0.0] # gleiche Reihenfolge wie states, musss zu rules passen und normalisiert werden +timepoint_num = 101 +def get_next_state(current_labels): + fastes_firing_time = 10000000.0 #dummy + firing_rule = None + firing_node = None + firing_edge = None + + # iterate over nodes + for node in nodes: + current_state = current_labels[node] + for rule in rules: + if 'tuple' in str(type(rule[0])): + # is contact rule + continue + if current_state == rule[0]: + current_fireing_time = np.random.exponential(1.0/rule[2]) + if current_fireing_time < fastes_firing_time: + fastes_firing_time = current_fireing_time + firing_rule = rule + firing_node = node + firing_edge = None + + + # iterate over edges: + for edge in graph_as_edgelist: + node1, node2 = edge + current_state1 = current_labels[node1] + current_state2 = current_labels[node2] + for rule in rules: + if 'str' in str(type(rule[0])): + # is spont. rule + continue + if (current_state1 == rule[0][0] and current_state2 == rule[0][1]) or (current_state2 == rule[0][0] and current_state1 == rule[0][1]): + current_fireing_time = np.random.exponential(1.0/rule[2]) + if current_fireing_time < fastes_firing_time: + fastes_firing_time = current_fireing_time + firing_rule = rule + firing_node = None + firing_edge = edge + + + if firing_rule is None: + # no rule could fire + return None, fastes_firing_time # would happen anyway but still + + # apply rule + new_labels = list(current_labels) # copy + + if firing_node is not None: + new_labels[firing_node] = firing_rule[1] + return new_labels, fastes_firing_time + + assert(firing_edge is not None) + change_node1 = firing_edge[0] + change_node2 = firing_edge[1] + # we have to check which node changes in which direction + if new_labels[change_node1] == firing_rule[0][0] and new_labels[change_node2] == firing_rule[0][1]: + new_labels[change_node1] = firing_rule[1][0] + new_labels[change_node2] = firing_rule[1][1] + else: + new_labels[change_node1] = firing_rule[1][1] + new_labels[change_node2] = firing_rule[1][0] + + + return new_labels, fastes_firing_time + +def count_states(current_labels): + counter = [0 for _ in states] + allStates.append(current_labels) + for label in current_labels: + index = states.index(label) + counter[index] += 1 + return counter +nodes = sorted(list(set([e[0] for e in graph_as_edgelist] + [e[1] for e in graph_as_edgelist]))) +assert(nodes == list(range(len(nodes)))) # nodes haben labels 0... + +# setup +timepoints_samples = np.linspace(0.0, horizon, timepoint_num) +timepoints_samples_static = np.linspace(0.0, horizon, timepoint_num) +initial_labels = list(np.random.choice(states, len(nodes), p=initial_distribution)) +current_labels = initial_labels +global_clock = 0.0 +labels = list() +timepoints = list() +state_counts = list() + + +# simulate +while len(timepoints_samples) > 0: + new_labels, time_passed = get_next_state(current_labels) + global_clock += time_passed + while len(timepoints_samples) > 0 and global_clock > timepoints_samples[0]: + labels.append(list(current_labels)) + state_counts.append(count_states(current_labels)) + timepoints_samples = timepoints_samples[1:] + current_labels = new_labels + +for i in allStates: + print(i) diff --git a/visualizer/public/css/main.css b/visualizer/public/css/main.css index d703d37..d7e2c74 100644 --- a/visualizer/public/css/main.css +++ b/visualizer/public/css/main.css @@ -1,6 +1,10 @@ +body{ + background-color: black; +} #mynetwork { height: 500px; width: 500px; background-color: white; border: 3px solid black; + background-color: gray; }