IPython кластер и PicklingError
Моя проблема, похоже, похожа на эту тему, хотя я думаю, что следую рекомендованному методу, я все еще получаю PicklingError. Когда я запускаю свой процесс локально без отправки в IPython Cluster Engine, функция работает нормально.
Я использую zipline с записной книжкой IPyhon, поэтому сначала создаю класс на основе zipline.TradingAlgorithm
Cell [1]
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
Сотовый [ 2 ]
%%px --local # This insures that the Class and modules exist on each engine
import zipline as zpl
import numpy as np
class Agent(zpl.TradingAlgorithm): # must define initialize and handle_data methods
def initialize(self):
self.valueHistory = None
pass
def handle_data(self, data):
for security in data.keys():
## Just randomly buy/sell/hold for each security
coinflip = np.random.random()
if coinflip < .25:
self.order(security,100)
elif coinflip > .75:
self.order(security,-100)
pass
Сотовый [ 3 ]
from zipline.utils.factory import load_from_yahoo
start = '2013-04-01'
end = '2013-06-01'
sidList = ['SPY','GOOG']
data = load_from_yahoo(stocks=sidList,start=start,end=end)
agentList = []
for i in range(3):
agentList.append(Agent())
def testSystem(agent,data):
results = agent.run(data) #-- This is how the zipline based class is executed
#-- next I'm just storing the final value of the test so I can plot later
agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])
return agent
for i in range(10):
tasks = []
for agent in agentList:
#agent = testSystem(agent,data) ## On its own, this works!
#-- To Test, uncomment the above line and comment out the next two
tasks.append(lview.apply_async(testSystem,agent,data))
agentList = [ar.get() for ar in tasks]
for agent in agentList:
plot(agent.valueHistory)
Здесь выдается ошибка:
PicklingError Traceback (most recent call last)/Library/Python/2.7/site-packages/IPython/kernel/zmq/serialize.pyc in serialize_object(obj, buffer_threshold, item_threshold)
100 buffers.extend(_extract_buffers(cobj, buffer_threshold))
101
--> 102 buffers.insert(0, pickle.dumps(cobj,-1))
103 return buffers
104
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Если я переопределить метод run() из zipline.TradingAlgorithm что-то вроде:
def run(self, data):
return 1
Попытка что-то вроде этого...
def run(self, data):
return zpl.TradingAlgorithm.run(self,data)
приводит к тому же PicklingError.
тогда срабатывает передача на двигатели, но очевидно, что смелость теста не выполнена. Поскольку run - это метод, внутренний для zipline.TradingAlgorithm, и я не знаю всего, что он делает, как бы мне убедиться, что он пропущен?
1 ответ
Похоже, что объект zipline TradingAlgorithm не подлежит расслаиванию после запуска:
import zipline as zpl
class Agent(zpl.TradingAlgorithm): # must define initialize and handle_data methods
def handle_data(self, data):
pass
agent = Agent()
pickle.dumps(agent)[:32] # ok
agent.run(data)
pickle.dumps(agent)[:32] # fails
Но это говорит мне о том, что вы должны создавать Агенты на движках и только передавать данные / результаты взад и вперед (в идеале, не передавать данные вообще или не более одного раза).
Минимизация передачи данных может выглядеть примерно так:
определить класс:
%%px
import zipline as zpl
import numpy as np
class Agent(zpl.TradingAlgorithm): # must define initialize and handle_data methods
def initialize(self):
self.valueHistory = []
def handle_data(self, data):
for security in data.keys():
## Just randomly buy/sell/hold for each security
coinflip = np.random.random()
if coinflip < .25:
self.order(security,100)
elif coinflip > .75:
self.order(security,-100)
загрузить данные
%%px
from zipline.utils.factory import load_from_yahoo
start = '2013-04-01'
end = '2013-06-01'
sidList = ['SPY','GOOG']
data = load_from_yahoo(stocks=sidList,start=start,end=end)
agent = Agent()
и запустите код:
def testSystem(agent, data):
results = agent.run(data) #-- This is how the zipline based class is executed
#-- next I'm just storing the final value of the test so I can plot later
agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1])
# create references to the remote agent / data objects
agent_ref = parallel.Reference('agent')
data_ref = parallel.Reference('data')
tasks = []
for i in range(10):
for j in range(len(rc)):
tasks.append(lview.apply_async(testSystem, agent_ref, data_ref))
# wait for the tasks to complete
[ t.get() for t in tasks ]
И составьте график результатов, никогда не выбирая самих агентов
%matplotlib inline
import matplotlib.pyplot as plt
for history in rc[:].apply_async(lambda : agent.valueHistory):
plt.plot(history)
Это не совсем тот код, которым вы поделились - три агента подпрыгивают взад-вперед на всех ваших движках, тогда как на каждого движка это имеет агент. Я не знаю достаточно о Zipline, чтобы сказать, полезно ли это для вас или нет.