Как создать агентов-призраков на основе агентов другого ранга в repast4py?

Я пытаюсь создать агент книги заказов, который можно использовать для всех рангов. Я могу поделиться агентом, используя операции отправки и получения mpi4py. Но на основании документации здесь я предполагаюself.context.synchronize(restore_orderbook)создаст агентов-призраков в каждом ранге , но программа показывает KeyError: 0. Агент, созданный в ранге 0, не копируется в другие ранги как призрак даже после использованияrequest_agentsфункция.

Ожидаемый результат:

Книга заказов создается с рангом 0, а призрачные копии создаются с рангом 1,2,3 с использованием книги заказов, созданной с рангом 0. Печатайте заказы в соответствии с логикой if else, присутствующей вstepфункция.

test2.py

      from typing import Dict, Tuple
from mpi4py import MPI
import numpy as np
from dataclasses import dataclass

from repast4py import core, random, space, schedule, logging, parameters
from repast4py import context as ctx
import repast4py
from queue import Queue
import pandas as pd
from datetime import datetime, timedelta
import random as rnd


order_count = 1000
items = np.arange(1, 101)
quantities = np.arange(1, 11)
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 1, 3)

item_values = np.random.choice(items, size=order_count)
qty_values = np.random.choice(quantities, size=order_count)
time_deltas = np.random.randint(0, int((end_date - start_date).total_seconds()), size=order_count)
date_values = [start_date + timedelta(seconds=int(delta)) for delta in time_deltas]

data = {
    'item': item_values,
    'qty': qty_values,
    'order_datetime': date_values
}
df = pd.DataFrame(data).sort_values(by=['order_datetime'], ascending=True).reset_index(drop=True)
df['order_id'] = np.arange(1, order_count+1)
df['order_datetime'] = pd.to_datetime(df['order_datetime'])
df['tick'] = df.apply(lambda x: 1 + int((x['order_datetime'] - df['order_datetime'].min()).total_seconds()/173), axis=1)
df.to_csv('test_data.csv', index=False)


class OrderBook(core.Agent): 
    TYPE = 0
    def __init__(self, a_id: int, rank: int):
        super().__init__(id=a_id, type=OrderBook.TYPE, rank=rank)
        self.df = list(pd.read_csv('test_data.csv').to_records())
        # self.df['order_datetime'] = pd.to_datetime(self.df['order_datetime'])
    
    def save(self) -> Tuple:
        return (self.uid, self.df)

    def get_order(self, tick):
        return rnd.choice(self.df)

    def update(self, data):
        self.df = data 

orderbook_cache = {}


def restore_orderbook(orderbook_data: Tuple):

    uid = orderbook_data[0]
    if uid[1] == OrderBook.TYPE:
        if uid in orderbook_cache:
            ob = orderbook_cache[uid]
        else:
            ob = OrderBook(uid[0], uid[1], uid[2])
            orderbook_cache[uid] = ob

        ob.df = orderbook_data[1]
        return ob

def create_agent(agent_data):
    uid = agent_data[0]
    book = OrderBook(uid[0], uid[2])
    book.df = agent_data[1]
    return book
            
class Model:

    def __init__(self, comm: MPI.Intracomm, params: Dict):
        self.context = ctx.SharedContext(comm)
        self.rank = comm.Get_rank()
        if self.rank == 0:
            book = OrderBook(1, 0)
            self.context.add(book)
            requests = []
            print(book.uid)

        else:
            requests = [((1,0,0), 0)]

        self.context.request_agents(requests, create_agent)

        
        self.runner = schedule.init_schedule_runner(comm)
        self.runner.schedule_repeating_event(1, 1, self.step)
        self.runner.schedule_stop(params['stop.at'])
        self.runner.schedule_end_event(self.at_end)
                
        
    def step(self):
        tick = self.runner.schedule.tick
        self.context.synchronize(restore_orderbook)
        
        if self.rank == 0 and tick < 25:
            for b in self.context.agents(OrderBook.TYPE):
                order = b.get_order(tick)
                print(self.rank, order)    
        elif self.rank == 1 and 25 <= tick < 50:
            for b in self.context.agents(OrderBook.TYPE):
                order = b.get_order(tick)
                print(self.rank, order)
        elif self.rank == 2 and 50 <= tick < 75:
            for b in self.context.agents(OrderBook.TYPE):
                order = b.get_order(tick)
                print(self.rank, order)
        elif self.rank == 3 and 75 <= tick < 100:
            for b in self.context.agents(OrderBook.TYPE):
                order = b.get_order(tick)
                print(self.rank, order)
    
    def at_end(self):
        print('simulation complete')

    def start(self):
        self.runner.execute()    
         
def run(params: Dict):
    global model
    model = Model(MPI.COMM_WORLD, params)
    model.start()


if __name__ == "__main__":
    parser = parameters.create_args_parser()
    args = parser.parse_args()
    params = parameters.init_params(args.parameters_file, args.parameters)
    run(params)

test.yaml

      random.seed: 42
stop.at: 100
orders.count: 1000

команда для запуска

      mpirun -n 4 python test2.py test.yaml

Я думаю, что агенты-призраки создаются не сself.context.request_agents(requests, create_agent). Правильно ли такое использование?

Ошибка:

      Traceback (most recent call last):
  File "test2.py", line 136, in <module>
    run(params)
  File "test2.py", line 129, in run
    model.start()
  File "test2.py", line 124, in start
    self.runner.execute()    
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 584, in execute
    self.schedule.execute()
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 406, in execute
    self.executing_group.execute(self.queue)
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 243, in execute
    interrupted = self.execute_evts(self.prioritized, queue)
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 223, in execute_evts
    evt()
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/schedule.py", line 60, in __call__
    self.evt()
  File "test2.py", line 108, in step
    for b in self.context.agents(OrderBook.TYPE):
  File "/home/vinay/miniconda3/lib/python3.8/site-packages/repast4py/context.py", line 358, in agents
    return self._agents_by_type[agent_type].values().__iter__()
KeyError: 0

Изменить: рабочее решение

я заменил

      for b in self.context.agents(OrderBook.TYPE):
order = b.get_order(tick)
print(self.rank, order)

с

      b  = self.context.ghost_agent((1,0,0))
order = b.get_order(tick)
print(self.rank, order)

1 ответ

Проблема здесь в том, что все чины должны позвонить. В вашем коде ранг 0 не вызывает его, и поэтому код зависает в ожидании, пока ранг 0 завершит вызов. В MPI это называется коллективной операцией. Это также упоминается в документах для

«Это коллективная операция, и все ранги должны вызывать ее, независимо от того, запрашиваются ли агенты этого ранга. Запрошенные агенты будут автоматически добавлены в качестве призраков к этому рангу»

Хотя это легко упустить из виду. Когда программа MPI зависает, это обычно происходит примерно так.

Я попробовал код с этим обновлением,

         if self.rank == 0:
        book = OrderBook(1, 0)
        self.context.add(book)
        requests = []
        # print(book.uid)

    else:
        requests = [((1,0,0), 0)]
    
    self.context.request_agents(requests, create_agent)

иrequest_agentsзвонок работает. Однако существует проблема с вызовом create_agent. Конструктор OrderBook принимает только 3 аргумента, а create_agent — 4.

Наконец, я не уверен, насколько хорошо кадр данных pandas будет собираться и передаваться между процессами. Это может сработать, но если вы получаете ошибки, попробуйте передать фрейм данных в виде списка списков или что-то в этом роде.

Другие вопросы по тегам