Проблема синхронизации при перемещении агентов в контексте с проекциями UndirectedSharedNetwork и SharedCSpace.

Я опираюсь на модель слухов (из урока repast4py), чтобы создать модель, в которой агенты могут перемещаться в двумерном непрерывном пространстве, и чьи действия будут зависеть от состояний ближайших агентов и некоторых удаленных агентов (в соответствии с заданным социальная сеть).

Для этого я добавил в контекст две проекции: UndirectedSharedNetwork (при чтении соцсети из входного файла, с помощью read_network) и SharedCSpace.

Похоже, в сети возникает ошибка при переходе агента из одного ранга в другой. Пожалуйста, смотрите ниже мой код и входные файлы, которые я использовал. Буду признателен за любую помощь или предложения по решению этой проблемы!

Вот пример сообщения об ошибке, которое я получаю при запуске программы:

        File "/home/works/lib/python3.10/site-packages/repast4py/context.py", line 297, in synchronize
    self._synch_ghosts(restore_agent)
  File "/home/works/lib/python3.10/site-packages/repast4py/context.py", line 259, in _synch_ghosts
    self._update_ghosts()
  File "/home/works/lib/python3.10/site-packages/repast4py/context.py", line 241, in _update_ghosts
    ghost = self._agent_manager._ghost_agents[update[0]]
KeyError: (0, 0, 0)

Я запускаю код с помощью:mpirun -n 4 python TestsPy3.py слух_модель.yaml

ТестыPy3.py:

      import networkx as nx
from typing import Dict
from mpi4py import MPI
import numpy as np
from dataclasses import dataclass

from repast4py.network import write_network, read_network
from repast4py import context as ctx
from repast4py import core, random, schedule, logging, parameters, space
import repast4py
from repast4py.space import ContinuousPoint as cpt
from repast4py.space import BorderType, OccupancyType
from typing import List

# --- Initializing global variable
model = None

# --- This function uses networkx to create a complex network, and repast4py to distribute it over ranks
def generate_network_file(fname: str, n_ranks: int, n_agents: int):
    """Generates a network file using repast4py.network.write_network.

    Args:
        fname: the name of the file to write to
        n_ranks: the number of process ranks to distribute the file over
        n_agents: the number of agents (node) in the network
    """

    # --- Creating a small wold network from watts and strogats with networkx
    g = nx.connected_watts_strogatz_graph(n_agents, 2, 0.25)
    try:
        # --- 
        import nxmetis
        write_network(g, 'rumor_network', fname, n_ranks, partition_method='metis')
    except ImportError:
        write_network(g, 'rumor_network', fname, n_ranks)

# --- This class implements the agent state and behavior
class RumorAgent(core.Agent):

    # --- This is the constructor of the agents
    def __init__(self, nid: int, agent_type: int, rank: int, received_rumor = False):
        # --- Calls the core.Agent constructor, passing the unique id of the agent
        super().__init__(nid, agent_type, rank)
        # --- Specifying if the agent has received the rumor and is spreading it
        self.received_rumor = received_rumor
 
    # --- Save the states of the agents that have their states changed
    def save(self):
        """Saves the state of this agent as tuple.

        A non-ghost agent will save its state using this
        method, and any ghost agents of this agent will
        be updated with that data (self.received_rumor).

        Returns:
            The agent's state
        """
        return (self.uid, self.received_rumor)

    def walk(self):

        pt = model.space.get_location(self)
        x = pt.x + 0.5
        y = pt.y + 0.5

        # --- Moving agents
        model.move(self, x, y)

        
    # --- Updating the states of the agents that have their states changed
    def update(self, data):
        """Updates the state of this agent when it is a ghost
        agent on some rank other than its local one.

        Args:
            data: the new agent state (received_rumor)
        """
        if (not self.received_rumor) and (data):
            # only update if the received rumor state
            # has changed from false to true
            model.rumor_spreaders.append(self)
            self.received_rumor = data

# --- This function is used to create the agents in the specified ranks according to the input file
def create_rumor_agent(nid, agent_type, rank, **kwargs):
    return RumorAgent(nid, agent_type, rank)


def restore_agent(agent_data):
    uid = agent_data[0]
    return RumorAgent(uid[0], uid[1], uid[2], agent_data[1])

# --- This dataclass is created to be used by the reduced type of logging
@dataclass
class RumorCounts:
    # --- We count the total number of spreaders over the entire simulation time
    total_rumor_spreaders: int
    # --- We count the number of new spreaders by simulation time
    new_rumor_spreaders: int
    # --- Counting the total number of non-rumor agents
    total_susceptible: int

# --- This class is responsible for managing the entire simulation
class Model:

    # --- This is the constructor of the model
    def __init__(self, comm, params):
        # --- Initializing the scheduling of the simulation
        self.runner = schedule.init_schedule_runner(comm)
        # --- Scheduling the frequency of updates
        self.runner.schedule_repeating_event(1, 1, self.step, 
                                    priority_type = repast4py.schedule.PriorityType.BY_PRIORITY, 
                                    priority = 1)
        # --- Creating the scheduling that stores the agents position
        self.runner.schedule_repeating_event(1, 1, self.tabular_log_agents, 
                                    priority_type = repast4py.schedule.PriorityType.BY_PRIORITY, 
                                    priority = 2)
        # --- Scheduling the stopping criterium
        #self.runner.schedule_stop(params['stop.at'])
        # --- Scheduling the closure of routines and files that are still open
        self.runner.schedule_end_event(self.at_end)

        # --- Getting the network file path from the input file
        fpath = params['network_file']
        # --- Creating the context where the agents and the network will be created
        self.context = ctx.SharedContext(comm)
        # --- Reading the network
        read_network(fpath, self.context, create_rumor_agent, restore_agent)
        # --- The network is created without any reference. To get the reference to the network, 
        # whose name is specified in the first line of the network file, we do:
        self.net = self.context.get_projection('social_net')

        # for agent in self.context.agents():
        #     print("rank: %s, agent: %s, edges: %s" % (comm.Get_rank(), agent, ([edge for edge in self.net._edges(agent)])))
        # raise(SystemExit)

        # --- Creating the list of spreader agents
        self.rumor_spreaders = []
        # --- Getting the current rank
        self.rank = comm.Get_rank()
        # --- Seeding the rumor agents in the input file among the available ranks
        self._seed_rumor(params['initial_rumor_count'], comm)

        # --- Getting the number of spreaders after seeding the spreaders among the cores
        rumored_count = len(self.rumor_spreaders)

        # --- Getting the total number of agents that does not have the rumor
        susceptible = len([agent for agent in self.context.agents() if (not agent.received_rumor)])

        # --- Creating an instance of rumor counts. A constructor is automatically generated by the @dataclass annotation
        self.counts = RumorCounts(rumored_count, rumored_count, susceptible)
        # --- Using the local instance of rumor counts to create a logger
        # Note that the logger will be sum over all the ranks (op = MPI.SUM)
        loggers = logging.create_loggers(self.counts, op=MPI.SUM, rank=self.rank)
        # --- Setting the reduced dataset to the logger (over all te ranks) and saving the output in the specified file
        self.data_set = logging.ReducingDataSet(loggers, comm, params['counts_file'])
        # --- Applying the reduced dataset to the tick 0
        self.data_set.log(0)

        # --- Getting the rumor probability spreading from the input file
        self.rumor_prob = params['rumor_probability']

        # --- Creating tabular logging
        self.agent_logger = logging.TabularLogger(comm, 
                                                  "output/agent_log.csv", 
                                                  ["tick", "id", "rank_initial", "rumor", "local_rank", 
                                                   "X", "Y"])        
        
        # ------------------------------------------------------------------------------ #

        # --- Printing the rank and the agents
        print(self.rank, [agent for agent in self.context.agents()])

        # ------------------------------------------------------------------------------ #

        # --- Creating a bounding box
        box = space.BoundingBox(0, params['world.L'], 0, params['world.L'], 0, 0)

        # --- Creating a new projection
        self.space = space.SharedCSpace('space', bounds = box, borders = BorderType.Periodic, 
                                        occupancy = OccupancyType.Multiple,
                                        buffer_size = 5, comm = comm, tree_threshold = 100)
        
        # --- Adding the bounding box to the context
        self.context.add_projection(self.space)

        # ------------------------------------------------------------------------------ #

        print("Bounding box successfully created in rank %s" % self.rank)

        # ------------------------------------------------------------------------------ #
        
        # --- Getting the local bounds
        local_bounds = self.space.get_local_bounds()

        # --- Printing the local bounds to see what is going on
        print("The box dimensions in rank %d are: x(%s, %s) y(%s, %s)" % (self.rank,
                                                                                 local_bounds.xmin, 
                                                                                 local_bounds.xmin + local_bounds.xextent, 
                                                                                 local_bounds.ymin, 
                                                                                 local_bounds.ymin + local_bounds.yextent))
        
        # --- Looping over the agents of that rank
        for agent in self.context.agents():
            # --- Sorteando posicao do agente
            x = random.default_rng.uniform(local_bounds.xmin, local_bounds.xmin + local_bounds.xextent) 
            y = random.default_rng.uniform(local_bounds.ymin, local_bounds.ymin + local_bounds.yextent)
            # --- O agente já existe no contexto. Então, o que eu preciso fazer é mover ele para a rede
            self.move(agent, x, y)

        # --- Logging the agents right now
        self.tabular_log_agents()

    # --- Definindo a acao move
    def move(self, agent, x, y):
        self.space.move(agent, cpt(x, y))

    # --- Defining function that realizes the tabular logging
    def tabular_log_agents(self):

        # --- Getting the current tick
        tick = self.runner.schedule.tick 
        # --- Looping over the agents in the contex
        for agent in self.context.agents():
            # --- Getting the agent location
            cpt = self.space.get_location(agent)
            # --- Logging rows of agents in the buffer 
            self.agent_logger.log_row(tick, agent.id, agent.uid_rank, 
                                      agent.received_rumor, agent.local_rank,
                                      cpt.x, cpt.y)
        # --- Once the logger is buffered, we write it
        self.agent_logger.write()

        # --- Returning
        return None

    # --- This is the function that seeds the rumor among the agents
    def _seed_rumor(self, init_rumor_count: int, comm):
        # --- Getting the number of processes being used
        world_size = comm.Get_size()
        # np array of world size, the value of i'th element of the array
        # is the number of rumors to seed on rank i.
        rumor_counts = np.zeros(world_size, np.int32)
        # --- Checking if this is the rank 0. The rank 0 will be considered, here, as a root rank
        if (self.rank == 0):
            # --- Looping over the number of agents with rumor in the initial time instant
            for _ in range(init_rumor_count):
                # --- Selecting an integer index at random
                idx = random.default_rng.integers(0, high=world_size)
                # --- increasing the counter of rumour agents to that index
                rumor_counts[idx] += 1

        # --- Creating buffer that is going to receive the information spread by the process
        rumor_count = np.empty(1, dtype = np.int32)
        # --- Spreading the rumor counts over the ranks
        comm.Scatter(rumor_counts, rumor_count, root = 0)

        # --- Looping over the agents from sharedcontext. Note that it gets "count" agents that are shuffled
        for agent in self.context.agents(count = rumor_count[0], shuffle=True):
            # --- Saying that the agent is spreading the rumor
            agent.received_rumor = True
            # --- Adding the agent to the list of spreaders
            self.rumor_spreaders.append(agent)

    # --- Closing the dataloggers once the simulation has ended
    def at_end(self):
        self.data_set.close()
        print("The process in rank %d has ended" % (self.rank))

    # --- In this function, we are going to define what is goin gto happen in every time step of the dynamics 
    def step(self):
        # --- Creating a list that keeps track of new rumor spreaders
        new_rumor_spreaders = []
        # --- Setting the random number generator
        rng = random.default_rng
        # --- Looking at the set of rumor spreaders of that core
        for agent in self.rumor_spreaders:
            # --- Looping over the neighbors of that agent
            for ngh in self.net.graph.neighbors(agent):
                # --- The three conditions are
                # 1 - If the neighbor is not propagating the rumor
                # 2 - If the local rank of the neighbor is the current rank
                # 3 - With a given probability of spreading given by self.rumor_prob
                # only update agents local to this rank
                if ((not ngh.received_rumor) and (ngh.local_rank == self.rank) and (rng.uniform() <= self.rumor_prob)):
                    # --- Setting the neighbor as infected
                    ngh.received_rumor = True
                    # --- Adding the neighbor as a recently infected agent
                    new_rumor_spreaders.append(ngh)

        #self.context.synchronize(restore_agent)

        # --- Looping over agents
        for agent in self.context.agents():
            agent.walk()
            #None

        # --- Updating network
        #self.net._agents_moved_rank(update_list, core.AgentManager)

        print("BEEEFORE BEEEFORE SYYYYYNNNNCCCC")

        #self.net._pre_synch_ghosts(core.AgentManager)

        self.context.synchronize(restore_agent)

        #self.net._post_agents_moved_rank(core.AgentManager, create_rumor_agent)

        # --- Adding the list of new spreaders to the list of all rumor spreaders
        self.rumor_spreaders += new_rumor_spreaders
        # --- Getting the count of new rumor spreaders at this tick
        self.counts.new_rumor_spreaders = len(new_rumor_spreaders)
        # --- Getting the count of total rumor spreader
        self.counts.total_rumor_spreaders += self.counts.new_rumor_spreaders

        # --- Updating field of self.counts
        self.counts.total_susceptible = len([agent for agent in self.context.agents() if (not agent.received_rumor)])

        # --- Logging the agents at this tick
        self.data_set.log(self.runner.schedule.tick)

        # --- Syncronizing the agents
        #self.context.synchronize(restore_agent)

        # --- Getting the suceptible agents
        susceptible_agents = np.asarray([len([agent for agent in self.context.agents() if (not agent.received_rumor)])])
        # --- Summing over all the processes
        self.context.comm.Allreduce(MPI.IN_PLACE, susceptible_agents, op = MPI.SUM)

        # --- Checking the stopping criteria
        if (susceptible_agents[0] == 0):
            # --- Scheduling the stopping criterium
            self.runner.stop()
            # --- Scheduling the closure of routines and files that are still open
            #self.runner.schedule_end_event(self.at_end)

    # --- Starting the simmulation by executing the runner (viz. schedule)
    def start(self):
        self.runner.execute()

# --- This function runs the simulation
def run(params: Dict):
    global model
    model = Model(MPI.COMM_WORLD, params)
    model.start()

# --- This if allows the simulation to be run from the command line
if __name__ == "__main__":
    parser = parameters.create_args_parser()
    args = parser.parse_args()
    params = parameters.init_params(args.parameters_file, args.parameters)
    run(params)

networkTest.txt:

      social_net 0
0 0 0
1 0 1
2 0 2
3 0 3
EDGES
0 1
0 2
2 3

слух_модель.yaml:

      network_file: networkTest.txt
initial_rumor_count: 1
stop.at: 10
world.L: 10
rumor_probability: 0.01
counts_file: output/rumor_counts.csv

0 ответов

Другие вопросы по тегам