Проблема синхронизации при перемещении агентов в контексте с проекциями UndirectedSharedNetwork и SharedCSpace.
Я опираюсь на модель слухов (из урока repast4py), чтобы создать модель, в которой агенты могут перемещаться в двумерном непрерывном пространстве, и чьи действия будут зависеть от состояний ближайших агентов и некоторых удаленных агентов (в соответствии с заданным социальная сеть).
Для этого я добавил в контекст две проекции: UndirectedSharedNetwork (при чтении соцсети из входного файла, с помощью read_network) и SharedCSpace.
Похоже, в сети возникает ошибка при переходе агента из одного ранга в другой. Пожалуйста, смотрите ниже мой код и входные файлы, которые я использовал. Буду признателен за любую помощь или предложения по решению этой проблемы!
Вот пример сообщения об ошибке, которое я получаю при запуске программы:
File "/home/works/lib/python3.10/site-packages/repast4py/context.py", line 297, in synchronize
self._synch_ghosts(restore_agent)
File "/home/works/lib/python3.10/site-packages/repast4py/context.py", line 259, in _synch_ghosts
self._update_ghosts()
File "/home/works/lib/python3.10/site-packages/repast4py/context.py", line 241, in _update_ghosts
ghost = self._agent_manager._ghost_agents[update[0]]
KeyError: (0, 0, 0)
Я запускаю код с помощью:mpirun -n 4 python TestsPy3.py слух_модель.yaml
ТестыPy3.py:
import networkx as nx
from typing import Dict
from mpi4py import MPI
import numpy as np
from dataclasses import dataclass
from repast4py.network import write_network, read_network
from repast4py import context as ctx
from repast4py import core, random, schedule, logging, parameters, space
import repast4py
from repast4py.space import ContinuousPoint as cpt
from repast4py.space import BorderType, OccupancyType
from typing import List
# --- Initializing global variable
model = None
# --- This function uses networkx to create a complex network, and repast4py to distribute it over ranks
def generate_network_file(fname: str, n_ranks: int, n_agents: int):
"""Generates a network file using repast4py.network.write_network.
Args:
fname: the name of the file to write to
n_ranks: the number of process ranks to distribute the file over
n_agents: the number of agents (node) in the network
"""
# --- Creating a small wold network from watts and strogats with networkx
g = nx.connected_watts_strogatz_graph(n_agents, 2, 0.25)
try:
# ---
import nxmetis
write_network(g, 'rumor_network', fname, n_ranks, partition_method='metis')
except ImportError:
write_network(g, 'rumor_network', fname, n_ranks)
# --- This class implements the agent state and behavior
class RumorAgent(core.Agent):
# --- This is the constructor of the agents
def __init__(self, nid: int, agent_type: int, rank: int, received_rumor = False):
# --- Calls the core.Agent constructor, passing the unique id of the agent
super().__init__(nid, agent_type, rank)
# --- Specifying if the agent has received the rumor and is spreading it
self.received_rumor = received_rumor
# --- Save the states of the agents that have their states changed
def save(self):
"""Saves the state of this agent as tuple.
A non-ghost agent will save its state using this
method, and any ghost agents of this agent will
be updated with that data (self.received_rumor).
Returns:
The agent's state
"""
return (self.uid, self.received_rumor)
def walk(self):
pt = model.space.get_location(self)
x = pt.x + 0.5
y = pt.y + 0.5
# --- Moving agents
model.move(self, x, y)
# --- Updating the states of the agents that have their states changed
def update(self, data):
"""Updates the state of this agent when it is a ghost
agent on some rank other than its local one.
Args:
data: the new agent state (received_rumor)
"""
if (not self.received_rumor) and (data):
# only update if the received rumor state
# has changed from false to true
model.rumor_spreaders.append(self)
self.received_rumor = data
# --- This function is used to create the agents in the specified ranks according to the input file
def create_rumor_agent(nid, agent_type, rank, **kwargs):
return RumorAgent(nid, agent_type, rank)
def restore_agent(agent_data):
uid = agent_data[0]
return RumorAgent(uid[0], uid[1], uid[2], agent_data[1])
# --- This dataclass is created to be used by the reduced type of logging
@dataclass
class RumorCounts:
# --- We count the total number of spreaders over the entire simulation time
total_rumor_spreaders: int
# --- We count the number of new spreaders by simulation time
new_rumor_spreaders: int
# --- Counting the total number of non-rumor agents
total_susceptible: int
# --- This class is responsible for managing the entire simulation
class Model:
# --- This is the constructor of the model
def __init__(self, comm, params):
# --- Initializing the scheduling of the simulation
self.runner = schedule.init_schedule_runner(comm)
# --- Scheduling the frequency of updates
self.runner.schedule_repeating_event(1, 1, self.step,
priority_type = repast4py.schedule.PriorityType.BY_PRIORITY,
priority = 1)
# --- Creating the scheduling that stores the agents position
self.runner.schedule_repeating_event(1, 1, self.tabular_log_agents,
priority_type = repast4py.schedule.PriorityType.BY_PRIORITY,
priority = 2)
# --- Scheduling the stopping criterium
#self.runner.schedule_stop(params['stop.at'])
# --- Scheduling the closure of routines and files that are still open
self.runner.schedule_end_event(self.at_end)
# --- Getting the network file path from the input file
fpath = params['network_file']
# --- Creating the context where the agents and the network will be created
self.context = ctx.SharedContext(comm)
# --- Reading the network
read_network(fpath, self.context, create_rumor_agent, restore_agent)
# --- The network is created without any reference. To get the reference to the network,
# whose name is specified in the first line of the network file, we do:
self.net = self.context.get_projection('social_net')
# for agent in self.context.agents():
# print("rank: %s, agent: %s, edges: %s" % (comm.Get_rank(), agent, ([edge for edge in self.net._edges(agent)])))
# raise(SystemExit)
# --- Creating the list of spreader agents
self.rumor_spreaders = []
# --- Getting the current rank
self.rank = comm.Get_rank()
# --- Seeding the rumor agents in the input file among the available ranks
self._seed_rumor(params['initial_rumor_count'], comm)
# --- Getting the number of spreaders after seeding the spreaders among the cores
rumored_count = len(self.rumor_spreaders)
# --- Getting the total number of agents that does not have the rumor
susceptible = len([agent for agent in self.context.agents() if (not agent.received_rumor)])
# --- Creating an instance of rumor counts. A constructor is automatically generated by the @dataclass annotation
self.counts = RumorCounts(rumored_count, rumored_count, susceptible)
# --- Using the local instance of rumor counts to create a logger
# Note that the logger will be sum over all the ranks (op = MPI.SUM)
loggers = logging.create_loggers(self.counts, op=MPI.SUM, rank=self.rank)
# --- Setting the reduced dataset to the logger (over all te ranks) and saving the output in the specified file
self.data_set = logging.ReducingDataSet(loggers, comm, params['counts_file'])
# --- Applying the reduced dataset to the tick 0
self.data_set.log(0)
# --- Getting the rumor probability spreading from the input file
self.rumor_prob = params['rumor_probability']
# --- Creating tabular logging
self.agent_logger = logging.TabularLogger(comm,
"output/agent_log.csv",
["tick", "id", "rank_initial", "rumor", "local_rank",
"X", "Y"])
# ------------------------------------------------------------------------------ #
# --- Printing the rank and the agents
print(self.rank, [agent for agent in self.context.agents()])
# ------------------------------------------------------------------------------ #
# --- Creating a bounding box
box = space.BoundingBox(0, params['world.L'], 0, params['world.L'], 0, 0)
# --- Creating a new projection
self.space = space.SharedCSpace('space', bounds = box, borders = BorderType.Periodic,
occupancy = OccupancyType.Multiple,
buffer_size = 5, comm = comm, tree_threshold = 100)
# --- Adding the bounding box to the context
self.context.add_projection(self.space)
# ------------------------------------------------------------------------------ #
print("Bounding box successfully created in rank %s" % self.rank)
# ------------------------------------------------------------------------------ #
# --- Getting the local bounds
local_bounds = self.space.get_local_bounds()
# --- Printing the local bounds to see what is going on
print("The box dimensions in rank %d are: x(%s, %s) y(%s, %s)" % (self.rank,
local_bounds.xmin,
local_bounds.xmin + local_bounds.xextent,
local_bounds.ymin,
local_bounds.ymin + local_bounds.yextent))
# --- Looping over the agents of that rank
for agent in self.context.agents():
# --- Sorteando posicao do agente
x = random.default_rng.uniform(local_bounds.xmin, local_bounds.xmin + local_bounds.xextent)
y = random.default_rng.uniform(local_bounds.ymin, local_bounds.ymin + local_bounds.yextent)
# --- O agente já existe no contexto. Então, o que eu preciso fazer é mover ele para a rede
self.move(agent, x, y)
# --- Logging the agents right now
self.tabular_log_agents()
# --- Definindo a acao move
def move(self, agent, x, y):
self.space.move(agent, cpt(x, y))
# --- Defining function that realizes the tabular logging
def tabular_log_agents(self):
# --- Getting the current tick
tick = self.runner.schedule.tick
# --- Looping over the agents in the contex
for agent in self.context.agents():
# --- Getting the agent location
cpt = self.space.get_location(agent)
# --- Logging rows of agents in the buffer
self.agent_logger.log_row(tick, agent.id, agent.uid_rank,
agent.received_rumor, agent.local_rank,
cpt.x, cpt.y)
# --- Once the logger is buffered, we write it
self.agent_logger.write()
# --- Returning
return None
# --- This is the function that seeds the rumor among the agents
def _seed_rumor(self, init_rumor_count: int, comm):
# --- Getting the number of processes being used
world_size = comm.Get_size()
# np array of world size, the value of i'th element of the array
# is the number of rumors to seed on rank i.
rumor_counts = np.zeros(world_size, np.int32)
# --- Checking if this is the rank 0. The rank 0 will be considered, here, as a root rank
if (self.rank == 0):
# --- Looping over the number of agents with rumor in the initial time instant
for _ in range(init_rumor_count):
# --- Selecting an integer index at random
idx = random.default_rng.integers(0, high=world_size)
# --- increasing the counter of rumour agents to that index
rumor_counts[idx] += 1
# --- Creating buffer that is going to receive the information spread by the process
rumor_count = np.empty(1, dtype = np.int32)
# --- Spreading the rumor counts over the ranks
comm.Scatter(rumor_counts, rumor_count, root = 0)
# --- Looping over the agents from sharedcontext. Note that it gets "count" agents that are shuffled
for agent in self.context.agents(count = rumor_count[0], shuffle=True):
# --- Saying that the agent is spreading the rumor
agent.received_rumor = True
# --- Adding the agent to the list of spreaders
self.rumor_spreaders.append(agent)
# --- Closing the dataloggers once the simulation has ended
def at_end(self):
self.data_set.close()
print("The process in rank %d has ended" % (self.rank))
# --- In this function, we are going to define what is goin gto happen in every time step of the dynamics
def step(self):
# --- Creating a list that keeps track of new rumor spreaders
new_rumor_spreaders = []
# --- Setting the random number generator
rng = random.default_rng
# --- Looking at the set of rumor spreaders of that core
for agent in self.rumor_spreaders:
# --- Looping over the neighbors of that agent
for ngh in self.net.graph.neighbors(agent):
# --- The three conditions are
# 1 - If the neighbor is not propagating the rumor
# 2 - If the local rank of the neighbor is the current rank
# 3 - With a given probability of spreading given by self.rumor_prob
# only update agents local to this rank
if ((not ngh.received_rumor) and (ngh.local_rank == self.rank) and (rng.uniform() <= self.rumor_prob)):
# --- Setting the neighbor as infected
ngh.received_rumor = True
# --- Adding the neighbor as a recently infected agent
new_rumor_spreaders.append(ngh)
#self.context.synchronize(restore_agent)
# --- Looping over agents
for agent in self.context.agents():
agent.walk()
#None
# --- Updating network
#self.net._agents_moved_rank(update_list, core.AgentManager)
print("BEEEFORE BEEEFORE SYYYYYNNNNCCCC")
#self.net._pre_synch_ghosts(core.AgentManager)
self.context.synchronize(restore_agent)
#self.net._post_agents_moved_rank(core.AgentManager, create_rumor_agent)
# --- Adding the list of new spreaders to the list of all rumor spreaders
self.rumor_spreaders += new_rumor_spreaders
# --- Getting the count of new rumor spreaders at this tick
self.counts.new_rumor_spreaders = len(new_rumor_spreaders)
# --- Getting the count of total rumor spreader
self.counts.total_rumor_spreaders += self.counts.new_rumor_spreaders
# --- Updating field of self.counts
self.counts.total_susceptible = len([agent for agent in self.context.agents() if (not agent.received_rumor)])
# --- Logging the agents at this tick
self.data_set.log(self.runner.schedule.tick)
# --- Syncronizing the agents
#self.context.synchronize(restore_agent)
# --- Getting the suceptible agents
susceptible_agents = np.asarray([len([agent for agent in self.context.agents() if (not agent.received_rumor)])])
# --- Summing over all the processes
self.context.comm.Allreduce(MPI.IN_PLACE, susceptible_agents, op = MPI.SUM)
# --- Checking the stopping criteria
if (susceptible_agents[0] == 0):
# --- Scheduling the stopping criterium
self.runner.stop()
# --- Scheduling the closure of routines and files that are still open
#self.runner.schedule_end_event(self.at_end)
# --- Starting the simmulation by executing the runner (viz. schedule)
def start(self):
self.runner.execute()
# --- This function runs the simulation
def run(params: Dict):
global model
model = Model(MPI.COMM_WORLD, params)
model.start()
# --- This if allows the simulation to be run from the command line
if __name__ == "__main__":
parser = parameters.create_args_parser()
args = parser.parse_args()
params = parameters.init_params(args.parameters_file, args.parameters)
run(params)
networkTest.txt:
social_net 0
0 0 0
1 0 1
2 0 2
3 0 3
EDGES
0 1
0 2
2 3
слух_модель.yaml:
network_file: networkTest.txt
initial_rumor_count: 1
stop.at: 10
world.L: 10
rumor_probability: 0.01
counts_file: output/rumor_counts.csv