visualization/visualizer/public/python/simulationAdvanced.py
2021-07-16 16:42:48 +02:00

242 lines
7.6 KiB
Python

# example execution
# python simulationAdvanced.py '{
# "graph": "strict graph GraphHello {\n0 -- 1;\n1 -- 2;\n1 -- 3;\n}",
# "horizon": 20.0,
# "states": ["S", "I", "R"],
# "initial_distribution": [0.5, 0.5, 0],
# "rules": "I R 1.0 R S 0.7 I S I I 0.8"
# }'
import numpy as np
import sys
import graphUtils
import json
# statesComp = ["S", "I", "R"]
rulesComp = [("I", "R", 1.0), # spontaneous rule I -> R with rate 1.0
("R", "S", 0.7), # spontaneous rule R -> S with rate 0.7
(("I","S","I"),("I","I","I"), 0.8)] # contact rule I+S -> I+I with rate 0.4
simulation = []
class Rule:
def __init__(self, ruleParts, probability):
self.ruleParts = ruleParts
self.probability = probability
def getString(self):
output = "("
for part in self.ruleParts:
output += f"({part.getFromState().getValue()}, {part.getToState().getValue()}), "
output = output[0:len(output)-2]
output += f"), {self.probability})"
return output
def getOutput(self):
# index of to state is ruleParts / 2
toStateIndex = int(len(self.ruleParts) / 2)
fromTuple = []
toTuple = []
for i in range(toStateIndex):
fromTuple.append(self.ruleParts[i])
toTuple.append(self.ruleParts[i + toStateIndex])
if(len(fromTuple) > 1):
fromTuple = tuple(fromTuple)
toTuple = tuple(toTuple)
return (fromTuple, toTuple, self.probability,)
else:
return (fromTuple[0], toTuple[0], self.probability,)
def parseState(string):
output = ""
i = 0
while(not(string[i].isdigit() or string[i] == " ")):
output += string[i]
i += 1
return (output, string[i+1:],)
def parseRate(string):
output = ""
i = 0
while(not string[i] == " "):
output += string[i]
i += 1
if(i == len(string)):
break
return(float(output), string[i+1:])
# Extract rules from string
# example: "Inf Inf Inf R 0.8"
def stringToRule(Input):
allRules = []
rulePartsBuffer = []
while(len(Input) > 0):
newString = parseState(Input)
rulePartsBuffer.append(newString[0])
# check if we are at the end
if(newString[1][0].isdigit()):
newString = parseRate(newString[1])
allRules.append(Rule(rulePartsBuffer.copy(), newString[0]))
rulePartsBuffer.clear()
Input = newString[1]
output = []
for r in allRules:
output.append(r.getOutput())
return output
def stringToStates(inp):
inp = inp.split(" ")
out = []
for s in inp:
out.append(s)
return out
def stringToDistr(inp):
inp = inp.split(" ")
out = []
for i in inp:
out.append(float(i))
return out
# parse the input wohooooo
# rules = stringToRule(sys.argv[1])
# states = stringToStates(sys.argv[2])
# initial_distribution = stringToDistr(sys.argv[3])
jsonInput = json.loads(sys.argv[1])
# print(jsonInput["graph"])
graph_as_edgelist = graphUtils.dotToEdgelist(jsonInput["graph"])[1]
horizon = jsonInput["horizon"]
states = jsonInput["states"]
initial_distribution = jsonInput["initial_distribution"]
rules = stringToRule(jsonInput["rules"])
# print(graph_as_edgelist)
#graph_as_edgelist = [(0, 4), (0, 1), (1, 5), (1, 2), (2, 6), (2, 3), (3, 7), (4, 8), (4, 5), (5, 9), (5, 6), (6, 10), (6, 7), (7, 11), (8, 12), (8, 9), (9, 13), (9, 10), (10, 14), (10, 11), (11, 15), (12, 13), (13, 14), (14, 15)]
# print(graphUtils.edgelistToDot("testGraph", graph_as_edgelist))
# print(graphUtils.dotToEdgelist("strict graph 'testGraph' {\n0 -- 4;\n0 -- 1;\n1 -- 5;\n}")[0])
# horizon = 20.0 # wie lange wird simuliert
# initial_distribution = [0.5, 0.5, 0.0] # gleiche Reihenfolge wie states, musss zu rules passen und normalisiert werden
timepoint_num = 101
def get_next_state(current_labels):
fastes_firing_time = 10000000.0 #dummy
firing_rule = None
firing_node = None
firing_edge = None
# iterate over nodes
for node in nodes:
current_state = current_labels[node]
for rule in rules:
if 'tuple' in str(type(rule[0])):
# is contact rule
continue
if current_state == rule[0]:
current_fireing_time = np.random.exponential(1.0/rule[2])
if current_fireing_time < fastes_firing_time:
fastes_firing_time = current_fireing_time
firing_rule = rule
firing_node = node
firing_edge = None
# iterate over edges:
for edge in graph_as_edgelist:
node1, node2 = edge
current_state1 = current_labels[node1]
current_state2 = current_labels[node2]
for rule in rules:
if 'str' in str(type(rule[0])):
# is spont. rule
continue
if (current_state1 == rule[0][0] and current_state2 == rule[0][1]) or (current_state2 == rule[0][0] and current_state1 == rule[0][1]):
current_fireing_time = np.random.exponential(1.0/rule[2])
if current_fireing_time < fastes_firing_time:
fastes_firing_time = current_fireing_time
firing_rule = rule
firing_node = None
firing_edge = edge
if firing_rule is None:
# no rule could fire
return None, fastes_firing_time # would happen anyway but still
# apply rule
new_labels = list(current_labels) # copy
if firing_node is not None:
new_labels[firing_node] = firing_rule[1]
return new_labels, fastes_firing_time
assert(firing_edge is not None)
change_node1 = firing_edge[0]
change_node2 = firing_edge[1]
# we have to check which node changes in which direction
if new_labels[change_node1] == firing_rule[0][0] and new_labels[change_node2] == firing_rule[0][1]:
new_labels[change_node1] = firing_rule[1][0]
new_labels[change_node2] = firing_rule[1][1]
else:
new_labels[change_node1] = firing_rule[1][1]
new_labels[change_node2] = firing_rule[1][0]
return new_labels, fastes_firing_time
def count_states(current_labels):
counter = [0 for _ in states]
simulation.append(current_labels)
for label in current_labels:
index = states.index(label)
counter[index] += 1
return counter
nodes = sorted(list(set([e[0] for e in graph_as_edgelist] + [e[1] for e in graph_as_edgelist])))
assert(nodes == list(range(len(nodes)))) # nodes haben labels 0...<N-1>
# setup
timepoints_samples = np.linspace(0.0, horizon, timepoint_num)
timepoints_samples_static = np.linspace(0.0, horizon, timepoint_num)
initial_labels = list(np.random.choice(states, len(nodes), p=initial_distribution))
current_labels = initial_labels
global_clock = 0.0
labels = list()
timepoints = list()
state_counts = list()
# simulate
while len(timepoints_samples) > 0:
new_labels, time_passed = get_next_state(current_labels)
global_clock += time_passed
while len(timepoints_samples) > 0 and global_clock > timepoints_samples[0]:
labels.append(list(current_labels))
state_counts.append(count_states(current_labels))
timepoints_samples = timepoints_samples[1:]
current_labels = new_labels
for l in simulation:
# print(l)
pass
# if(rulesComp == rules):
# print("Rules were parsed correctly")
# else:
# print("Rules were not parsed correctly")
# prepare the output json file
outputJson = {
"states": simulation
}
print(json.dumps(outputJson))