Есть ли способ добавить данные в распределенный Tensorflow?

Я использую распределенный TensorFlow не для распространения по сети, а для распространения работы.

С распределенным TensorFlow мы получаем структуру для распределения работы и связи между работниками для получения статуса. Этот светлый взвешенный протокол связи, встроенное восстановление и выбор устройства для конкретной задачи заставляет меня попробовать использовать распределенный tenorFlow для параллельного построения нескольких микромоделей.

Так что в моем коде это то, что я делаю.

def main(_): 
    #some global data block
    a = np.arange(10).reshape((5, 2))
    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" %server.server_def.task_index,cluster=server.server_def.cluster)):
        #some ops to keep the cluster alive 
        var = tf.Variable(initial_value=10, name='var')
        op = tf.assign_add(var,10)
        xx = tf.placeholder("float")
        yy = tf.reduce_sum(xx)
        #start monitoring session 
    with tf.train.MonitoredTrainingSession(master=server.target,is_chief=is_chief) as mon_sess:
        mon_sess.run(op)
        #distribute data
        inputs = a[:,server.server_def.task_index]
        #start a local session in worker 
        sess = tf.Session()
        sum_value = sess.run(yy,feed_dict={xx:inputs})
        sess.close()

После того, как каждая рабочая работа завершена, я хочу добавить некоторую информацию к переменной в глобальной сети. (Поскольку мы не можем обновлять глобальные переменные, такие как a в приведенном выше примере, я хочу использовать mon_sess обновить глобальную сеть.

Я хочу добавить несколько тензоров (o / p каждого работника) и сделать chief читать и писать это. Есть ли способ сделать это?

И, пожалуйста, обновите, если вы видите какие-либо проблемы в вышеупомянутом подходе.

Спасибо,

1 ответ

Я устал от этого и смог получить информацию от местных работников в глобальную сеть

import tensorflow as tf
import numpy as np
import os
import time
def main(server, log_dir, context):
    #create a random array
    a = np.arange(10).reshape((5, 2))
    with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" %server.server_def.task_index,cluster=server.server_def.cluster)):
        var = tf.Variable(initial_value=10, name='var')
        op = tf.assign_add(var,10)
        xx = tf.placeholder("float")
        yy = tf.reduce_sum(xx)
        concat_init = tf.Variable([0],dtype=tf.float32)
        sum_holder = tf.placeholder(tf.float32)
        concat_op = tf.concat([concat_init,sum_holder],0)
        assign_op = tf.assign(concat_init,concat_op,validate_shape=False)
    is_chief = server.server_def.task_index == 0
    with tf.train.MonitoredTrainingSession(master=server.target,is_chief=is_chief) as mon_sess:
        mon_sess.run(op)
        print (a)
        print ("reading my part")
        inputs = a[:,server.server_def.task_index]
        print(inputs)
        sess = tf.Session()
        sum_value = sess.run(yy,feed_dict={xx:inputs})
        print(sum_value)
        mon_sess.run(assign_op,feed_dict={sum_holder:[sum_value]})
        if is_chief:
            time.sleep(5)
            worker_sums = mon_sess.run(assign_op,feed_dict={sum_holder:[0]})
            print (worker_sums)
        sess.close()
        if is_chief:
            while True:
                pass
Другие вопросы по тегам