advanced simulation rule parsing added
This commit is contained in:
parent
2431c0a3a1
commit
f1bce8dc4e
16
python/.vscode/launch.json
vendored
Normal file
16
python/.vscode/launch.json
vendored
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
{
|
||||||
|
// Use IntelliSense to learn about possible attributes.
|
||||||
|
// Hover to view descriptions of existing attributes.
|
||||||
|
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Python: Current File",
|
||||||
|
"type": "python",
|
||||||
|
"request": "launch",
|
||||||
|
"program": "${file}",
|
||||||
|
"console": "integratedTerminal",
|
||||||
|
"args": ["I R 1.0 R S 0.7 I S I I 0.8"]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
206
python/simulationAdvanced.py
Normal file
206
python/simulationAdvanced.py
Normal file
@ -0,0 +1,206 @@
|
|||||||
|
import numpy as np
|
||||||
|
import sys
|
||||||
|
statesComp = ["S", "I", "R"]
|
||||||
|
rulesComp = [("I", "R", 1.0), # spontaneous rule I -> R with rate 1.0
|
||||||
|
("R", "S", 0.7), # spontaneous rule R -> S with rate 0.7
|
||||||
|
(("I","S"),("I","I"), 0.8)] # contact rule I+S -> I+I with rate 0.4
|
||||||
|
|
||||||
|
allStates = []
|
||||||
|
|
||||||
|
class Rule:
|
||||||
|
def __init__(self, ruleParts, probability):
|
||||||
|
self.ruleParts = ruleParts
|
||||||
|
self.probability = probability
|
||||||
|
|
||||||
|
def getString(self):
|
||||||
|
output = "("
|
||||||
|
for part in self.ruleParts:
|
||||||
|
output += f"({part.getFromState().getValue()}, {part.getToState().getValue()}), "
|
||||||
|
output = output[0:len(output)-2]
|
||||||
|
output += f"), {self.probability})"
|
||||||
|
return output
|
||||||
|
|
||||||
|
def getOutput(self):
|
||||||
|
# index of to state is ruleParts / 2
|
||||||
|
toStateIndex = int(len(self.ruleParts) / 2)
|
||||||
|
fromTuple = []
|
||||||
|
toTuple = []
|
||||||
|
for i in range(toStateIndex):
|
||||||
|
fromTuple.append(self.ruleParts[i])
|
||||||
|
toTuple.append(self.ruleParts[i + toStateIndex])
|
||||||
|
|
||||||
|
if(len(fromTuple) > 1):
|
||||||
|
fromTuple = tuple(fromTuple)
|
||||||
|
toTuple = tuple(toTuple)
|
||||||
|
return (fromTuple, toTuple, self.probability,)
|
||||||
|
else:
|
||||||
|
return (fromTuple[0], toTuple[0], self.probability,)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def parseState(string):
|
||||||
|
output = ""
|
||||||
|
i = 0
|
||||||
|
while(not(string[i].isdigit() or string[i] == " ")):
|
||||||
|
output += string[i]
|
||||||
|
i += 1
|
||||||
|
return (output, string[i+1:],)
|
||||||
|
|
||||||
|
def parseRate(string):
|
||||||
|
output = ""
|
||||||
|
i = 0
|
||||||
|
while(not string[i] == " "):
|
||||||
|
output += string[i]
|
||||||
|
i += 1
|
||||||
|
if(i == len(string)):
|
||||||
|
break
|
||||||
|
return(float(output), string[i+1:])
|
||||||
|
|
||||||
|
|
||||||
|
# Extract rules from string
|
||||||
|
# example: "Inf Inf Inf R 0.8"
|
||||||
|
def stringToRule(Input):
|
||||||
|
allRules = []
|
||||||
|
rulePartsBuffer = []
|
||||||
|
while(len(Input) > 0):
|
||||||
|
newString = parseState(Input)
|
||||||
|
rulePartsBuffer.append(newString[0])
|
||||||
|
allStates.append(newString[0])
|
||||||
|
|
||||||
|
# check if we are at the end
|
||||||
|
if(newString[1][0].isdigit()):
|
||||||
|
newString = parseRate(newString[1])
|
||||||
|
allRules.append(Rule(rulePartsBuffer.copy(), newString[0]))
|
||||||
|
rulePartsBuffer.clear()
|
||||||
|
|
||||||
|
Input = newString[1]
|
||||||
|
|
||||||
|
output = []
|
||||||
|
for r in allRules:
|
||||||
|
output.append(r.getOutput())
|
||||||
|
return output
|
||||||
|
|
||||||
|
def generateStatesList():
|
||||||
|
out = []
|
||||||
|
for s in allStates:
|
||||||
|
if(not(s in out)):
|
||||||
|
out.append(s)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
# parse the input wohooooo
|
||||||
|
rules = stringToRule(sys.argv[1])
|
||||||
|
states = generateStatesList()
|
||||||
|
|
||||||
|
simulation = []
|
||||||
|
|
||||||
|
|
||||||
|
graph_as_edgelist = [(0, 4), (0, 1), (1, 5), (1, 2), (2, 6), (2, 3), (3, 7), (4, 8), (4, 5), (5, 9), (5, 6), (6, 10), (6, 7), (7, 11), (8, 12), (8, 9), (9, 13), (9, 10), (10, 14), (10, 11), (11, 15), (12, 13), (13, 14), (14, 15)]
|
||||||
|
|
||||||
|
horizon = 20.0 # wie lange wird simuliert
|
||||||
|
initial_distribution = [0.5, 0.5, 0.0] # gleiche Reihenfolge wie states, musss zu rules passen und normalisiert werden
|
||||||
|
timepoint_num = 101
|
||||||
|
def get_next_state(current_labels):
|
||||||
|
fastes_firing_time = 10000000.0 #dummy
|
||||||
|
firing_rule = None
|
||||||
|
firing_node = None
|
||||||
|
firing_edge = None
|
||||||
|
|
||||||
|
# iterate over nodes
|
||||||
|
for node in nodes:
|
||||||
|
current_state = current_labels[node]
|
||||||
|
for rule in rules:
|
||||||
|
if 'tuple' in str(type(rule[0])):
|
||||||
|
# is contact rule
|
||||||
|
continue
|
||||||
|
if current_state == rule[0]:
|
||||||
|
current_fireing_time = np.random.exponential(1.0/rule[2])
|
||||||
|
if current_fireing_time < fastes_firing_time:
|
||||||
|
fastes_firing_time = current_fireing_time
|
||||||
|
firing_rule = rule
|
||||||
|
firing_node = node
|
||||||
|
firing_edge = None
|
||||||
|
|
||||||
|
|
||||||
|
# iterate over edges:
|
||||||
|
for edge in graph_as_edgelist:
|
||||||
|
node1, node2 = edge
|
||||||
|
current_state1 = current_labels[node1]
|
||||||
|
current_state2 = current_labels[node2]
|
||||||
|
for rule in rules:
|
||||||
|
if 'str' in str(type(rule[0])):
|
||||||
|
# is spont. rule
|
||||||
|
continue
|
||||||
|
if (current_state1 == rule[0][0] and current_state2 == rule[0][1]) or (current_state2 == rule[0][0] and current_state1 == rule[0][1]):
|
||||||
|
current_fireing_time = np.random.exponential(1.0/rule[2])
|
||||||
|
if current_fireing_time < fastes_firing_time:
|
||||||
|
fastes_firing_time = current_fireing_time
|
||||||
|
firing_rule = rule
|
||||||
|
firing_node = None
|
||||||
|
firing_edge = edge
|
||||||
|
|
||||||
|
|
||||||
|
if firing_rule is None:
|
||||||
|
# no rule could fire
|
||||||
|
return None, fastes_firing_time # would happen anyway but still
|
||||||
|
|
||||||
|
# apply rule
|
||||||
|
new_labels = list(current_labels) # copy
|
||||||
|
|
||||||
|
if firing_node is not None:
|
||||||
|
new_labels[firing_node] = firing_rule[1]
|
||||||
|
return new_labels, fastes_firing_time
|
||||||
|
|
||||||
|
assert(firing_edge is not None)
|
||||||
|
change_node1 = firing_edge[0]
|
||||||
|
change_node2 = firing_edge[1]
|
||||||
|
# we have to check which node changes in which direction
|
||||||
|
if new_labels[change_node1] == firing_rule[0][0] and new_labels[change_node2] == firing_rule[0][1]:
|
||||||
|
new_labels[change_node1] = firing_rule[1][0]
|
||||||
|
new_labels[change_node2] = firing_rule[1][1]
|
||||||
|
else:
|
||||||
|
new_labels[change_node1] = firing_rule[1][1]
|
||||||
|
new_labels[change_node2] = firing_rule[1][0]
|
||||||
|
|
||||||
|
|
||||||
|
return new_labels, fastes_firing_time
|
||||||
|
|
||||||
|
def count_states(current_labels):
|
||||||
|
counter = [0 for _ in states]
|
||||||
|
simulation.append(current_labels)
|
||||||
|
for label in current_labels:
|
||||||
|
index = states.index(label)
|
||||||
|
counter[index] += 1
|
||||||
|
return counter
|
||||||
|
|
||||||
|
nodes = sorted(list(set([e[0] for e in graph_as_edgelist] + [e[1] for e in graph_as_edgelist])))
|
||||||
|
assert(nodes == list(range(len(nodes)))) # nodes haben labels 0...<N-1>
|
||||||
|
|
||||||
|
# setup
|
||||||
|
timepoints_samples = np.linspace(0.0, horizon, timepoint_num)
|
||||||
|
timepoints_samples_static = np.linspace(0.0, horizon, timepoint_num)
|
||||||
|
initial_labels = list(np.random.choice(states, len(nodes), p=initial_distribution))
|
||||||
|
current_labels = initial_labels
|
||||||
|
global_clock = 0.0
|
||||||
|
labels = list()
|
||||||
|
timepoints = list()
|
||||||
|
state_counts = list()
|
||||||
|
|
||||||
|
|
||||||
|
# simulate
|
||||||
|
while len(timepoints_samples) > 0:
|
||||||
|
new_labels, time_passed = get_next_state(current_labels)
|
||||||
|
global_clock += time_passed
|
||||||
|
while len(timepoints_samples) > 0 and global_clock > timepoints_samples[0]:
|
||||||
|
labels.append(list(current_labels))
|
||||||
|
state_counts.append(count_states(current_labels))
|
||||||
|
timepoints_samples = timepoints_samples[1:]
|
||||||
|
current_labels = new_labels
|
||||||
|
|
||||||
|
|
||||||
|
for l in simulation:
|
||||||
|
print(l)
|
||||||
|
pass
|
||||||
|
|
||||||
|
if(rulesComp == rules):
|
||||||
|
print("yess")
|
||||||
109
python/test.py
Normal file
109
python/test.py
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
import numpy as np
|
||||||
|
|
||||||
|
allStates = []
|
||||||
|
states = ["S", "I", "R"]
|
||||||
|
rules = [("I", "R", 1.0), # spontaneous rule I -> R with rate 1.0
|
||||||
|
("R", "S", 0.7), # spontaneous rule R -> S with rate 0.7
|
||||||
|
(("I","S"),("I","I"), 0.8)] # contact rule I+S -> I+I with rate 0.4
|
||||||
|
graph_as_edgelist = [(0, 4), (0, 1), (1, 5), (1, 2), (2, 6), (2, 3), (3, 7), (4, 8), (4, 5), (5, 9), (5, 6), (6, 10), (6, 7), (7, 11), (8, 12), (8, 9), (9, 13), (9, 10), (10, 14), (10, 11), (11, 15), (12, 13), (13, 14), (14, 15)]
|
||||||
|
horizon = 20.0 # wie lange wird simuliert
|
||||||
|
initial_distribution = [0.5, 0.5, 0.0] # gleiche Reihenfolge wie states, musss zu rules passen und normalisiert werden
|
||||||
|
timepoint_num = 101
|
||||||
|
def get_next_state(current_labels):
|
||||||
|
fastes_firing_time = 10000000.0 #dummy
|
||||||
|
firing_rule = None
|
||||||
|
firing_node = None
|
||||||
|
firing_edge = None
|
||||||
|
|
||||||
|
# iterate over nodes
|
||||||
|
for node in nodes:
|
||||||
|
current_state = current_labels[node]
|
||||||
|
for rule in rules:
|
||||||
|
if 'tuple' in str(type(rule[0])):
|
||||||
|
# is contact rule
|
||||||
|
continue
|
||||||
|
if current_state == rule[0]:
|
||||||
|
current_fireing_time = np.random.exponential(1.0/rule[2])
|
||||||
|
if current_fireing_time < fastes_firing_time:
|
||||||
|
fastes_firing_time = current_fireing_time
|
||||||
|
firing_rule = rule
|
||||||
|
firing_node = node
|
||||||
|
firing_edge = None
|
||||||
|
|
||||||
|
|
||||||
|
# iterate over edges:
|
||||||
|
for edge in graph_as_edgelist:
|
||||||
|
node1, node2 = edge
|
||||||
|
current_state1 = current_labels[node1]
|
||||||
|
current_state2 = current_labels[node2]
|
||||||
|
for rule in rules:
|
||||||
|
if 'str' in str(type(rule[0])):
|
||||||
|
# is spont. rule
|
||||||
|
continue
|
||||||
|
if (current_state1 == rule[0][0] and current_state2 == rule[0][1]) or (current_state2 == rule[0][0] and current_state1 == rule[0][1]):
|
||||||
|
current_fireing_time = np.random.exponential(1.0/rule[2])
|
||||||
|
if current_fireing_time < fastes_firing_time:
|
||||||
|
fastes_firing_time = current_fireing_time
|
||||||
|
firing_rule = rule
|
||||||
|
firing_node = None
|
||||||
|
firing_edge = edge
|
||||||
|
|
||||||
|
|
||||||
|
if firing_rule is None:
|
||||||
|
# no rule could fire
|
||||||
|
return None, fastes_firing_time # would happen anyway but still
|
||||||
|
|
||||||
|
# apply rule
|
||||||
|
new_labels = list(current_labels) # copy
|
||||||
|
|
||||||
|
if firing_node is not None:
|
||||||
|
new_labels[firing_node] = firing_rule[1]
|
||||||
|
return new_labels, fastes_firing_time
|
||||||
|
|
||||||
|
assert(firing_edge is not None)
|
||||||
|
change_node1 = firing_edge[0]
|
||||||
|
change_node2 = firing_edge[1]
|
||||||
|
# we have to check which node changes in which direction
|
||||||
|
if new_labels[change_node1] == firing_rule[0][0] and new_labels[change_node2] == firing_rule[0][1]:
|
||||||
|
new_labels[change_node1] = firing_rule[1][0]
|
||||||
|
new_labels[change_node2] = firing_rule[1][1]
|
||||||
|
else:
|
||||||
|
new_labels[change_node1] = firing_rule[1][1]
|
||||||
|
new_labels[change_node2] = firing_rule[1][0]
|
||||||
|
|
||||||
|
|
||||||
|
return new_labels, fastes_firing_time
|
||||||
|
|
||||||
|
def count_states(current_labels):
|
||||||
|
counter = [0 for _ in states]
|
||||||
|
allStates.append(current_labels)
|
||||||
|
for label in current_labels:
|
||||||
|
index = states.index(label)
|
||||||
|
counter[index] += 1
|
||||||
|
return counter
|
||||||
|
nodes = sorted(list(set([e[0] for e in graph_as_edgelist] + [e[1] for e in graph_as_edgelist])))
|
||||||
|
assert(nodes == list(range(len(nodes)))) # nodes haben labels 0...<N-1>
|
||||||
|
|
||||||
|
# setup
|
||||||
|
timepoints_samples = np.linspace(0.0, horizon, timepoint_num)
|
||||||
|
timepoints_samples_static = np.linspace(0.0, horizon, timepoint_num)
|
||||||
|
initial_labels = list(np.random.choice(states, len(nodes), p=initial_distribution))
|
||||||
|
current_labels = initial_labels
|
||||||
|
global_clock = 0.0
|
||||||
|
labels = list()
|
||||||
|
timepoints = list()
|
||||||
|
state_counts = list()
|
||||||
|
|
||||||
|
|
||||||
|
# simulate
|
||||||
|
while len(timepoints_samples) > 0:
|
||||||
|
new_labels, time_passed = get_next_state(current_labels)
|
||||||
|
global_clock += time_passed
|
||||||
|
while len(timepoints_samples) > 0 and global_clock > timepoints_samples[0]:
|
||||||
|
labels.append(list(current_labels))
|
||||||
|
state_counts.append(count_states(current_labels))
|
||||||
|
timepoints_samples = timepoints_samples[1:]
|
||||||
|
current_labels = new_labels
|
||||||
|
|
||||||
|
for i in allStates:
|
||||||
|
print(i)
|
||||||
@ -1,6 +1,10 @@
|
|||||||
|
body{
|
||||||
|
background-color: black;
|
||||||
|
}
|
||||||
#mynetwork {
|
#mynetwork {
|
||||||
height: 500px;
|
height: 500px;
|
||||||
width: 500px;
|
width: 500px;
|
||||||
background-color: white;
|
background-color: white;
|
||||||
border: 3px solid black;
|
border: 3px solid black;
|
||||||
|
background-color: gray;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user