Данные не распределяются среди работников Tensorflow

Я написал распределенную программу TensorFlow с заданием 1 пс и 2 заданиями. Я ожидал, что пакеты данных будут распределены среди рабочих, но, похоже, это не так. Я вижу, что только один рабочий (task=0) выполняет всю работу, а другой бездействует. Не могли бы вы помочь мне найти проблему с этой программой:

    import math
    import tensorflow as tf
    from tensorflow.examples.tutorials.mnist import input_data

    # Flags for defining the tf.train.ClusterSpec
    tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
    tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")
    tf.app.flags.DEFINE_string("master_hosts", "oser502110:2222",
                           "Comma-separated list of hostname:port pairs")

    # Flags for defining the tf.train.Server
    tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
    tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
    tf.app.flags.DEFINE_integer("hidden_units", 100,
                            "Number of units in the hidden layer of the NN")
    tf.app.flags.DEFINE_string("data_dir", "/tmp/mnist-data",
                           "Directory for storing mnist data")
    tf.app.flags.DEFINE_integer("batch_size", 100, "Training batch size")

    tf.app.flags.DEFINE_string("worker_grpc_url", None,
                    "Worker GRPC URL")

    FLAGS = tf.app.flags.FLAGS

    IMAGE_PIXELS = 28

def main(_):
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")
    master_hosts = FLAGS.master_hosts.split(",")


    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

    # Create and start a server for the local task.
    server = tf.train.Server(cluster,
                             job_name=FLAGS.job_name,
                             task_index=FLAGS.task_index)

    if FLAGS.job_name == "ps":
        server.join()
    elif FLAGS.job_name == "worker":

        is_chief = (FLAGS.task_index == 0)
        if is_chief: tf.reset_default_graph()


# Assigns ops to the local worker by default.
        with tf.device(tf.train.replica_device_setter(
                worker_device="/job:worker/task:%d" % FLAGS.task_index,
                cluster=cluster)):

            # Variables of the hidden layer
            hid_w = tf.Variable(
                tf.truncated_normal([IMAGE_PIXELS * IMAGE_PIXELS, FLAGS.hidden_units],
                                    stddev=1.0 / IMAGE_PIXELS), name="hid_w")
            hid_b = tf.Variable(tf.zeros([FLAGS.hidden_units]), name="hid_b")

            # Variables of the softmax layer
            sm_w = tf.Variable(
                tf.truncated_normal([FLAGS.hidden_units, 10],
                                    stddev=1.0 / math.sqrt(FLAGS.hidden_units)),
                name="sm_w")
            sm_b = tf.Variable(tf.zeros([10]), name="sm_b")

            x = tf.placeholder(tf.float32, [None, IMAGE_PIXELS * IMAGE_PIXELS])
            y_ = tf.placeholder(tf.float32, [None, 10])

            hid_lin = tf.nn.xw_plus_b(x, hid_w, hid_b)
            hid = tf.nn.relu(hid_lin)

            y = tf.nn.softmax(tf.nn.xw_plus_b(hid, sm_w, sm_b))
            loss = -tf.reduce_sum(y_ * tf.log(tf.clip_by_value(y, 1e-10, 1.0)))

            global_step = tf.Variable(0, trainable=False)

            train_op = tf.train.AdagradOptimizer(0.01).minimize(
                loss, global_step=global_step)

            saver = tf.train.Saver()
            #summary_op = tf.merge_all_summaries()
            init_op = tf.initialize_all_variables()

        # Create a "supervisor", which oversees the training process.
        sv = tf.train.Supervisor(is_chief=is_chief,
                                 logdir="/tmp/train_logs",
                                 init_op=init_op,
                                 recovery_wait_secs=1,
                                 saver=saver,
                                 global_step=global_step,
                                 save_model_secs=600)
        if is_chief:
            print("Worker %d: Initializing session..." % FLAGS.task_index)
        else:
            print("Worker %d: Waiting for session to be initialized..." % FLAGS.task_index)

        mnist = input_data.read_data_sets(FLAGS.data_dir, one_hot=True)

        sess_config = tf.ConfigProto(allow_soft_placement=True, log_device_placement=True,
                                     device_filters=["/job:ps", "/job:worker/task:%d" % FLAGS.task_index])


        # The supervisor takes care of session initialization, restoring from
        # a checkpoint, and closing when done or an error occurs.
        with sv.prepare_or_wait_for_session(server.target, config=sess_config) as sess:
            print("Worker %d: Session initialization complete." % FLAGS.task_index)
            # Loop until the supervisor shuts down or 1000000 steps have completed.
            step = 0
            while not sv.should_stop() and step < 1000000:
                # Run a training step asynchronously.
                # See `tf.train.SyncReplicasOptimizer` for additional details on how to
                # perform *synchronous* training.

                batch_xs, batch_ys = mnist.train.next_batch(FLAGS.batch_size)
                print("FETCHING NEXT BATCH %d" % FLAGS.batch_size)
                train_feed = {x: batch_xs, y_: batch_ys}

                _, step = sess.run([train_op, global_step], feed_dict=train_feed)
                if step % 100 == 0:
                    print("Done step %d" % step)

        # Ask for all the services to stop.
        sv.stop()

if __name__ == "__main__":
    tf.app.run()

а вот логи от рабочих с заданием = 0:

2017-06-20 04: 50: 58.405431: I tenorflow/core/common_runtime/simple_placer.cc:841] Adagrad/ значение: (Const)/job: пс / реплика: 0 / задача: 0 / процессор: 0 truncated_normal / stddev: (Const): / job: worker / replica: 0 / task: 0 / gpu: 0
2017-06-20 04: 50: 58.405456: I tenorflow / core / common_runtime / simple_placer.cc: 841] truncated_normal / stddev: (Const)/job: worker / replica: 0 / task: 0 / gpu: 0 truncated_normal / mean: (Const): / job: worker / replica: 0 / task: 0 / gpu: 0
2017-06-20 04: 50: 58.405481: I tenorflow / core / common_runtime / simple_placer.cc: 841] truncated_normal / mean: (Const)/job: worker / replica: 0 / task: 0 / gpu: 0 truncated_normal / shape: (Const): / job: worker / replica: 0 / task: 0 / gpu: 0
2017-06-20 04: 50: 58.405506: I tenorflow/core/common_runtime/simple_placer.cc:841] truncated_normal/shape: (Const)/job:worker/replica:0/task:0/gpu:0 Worker 0: Инициализация сеанса завершена.
ПОЛУЧЕНИЕ СЛЕДУЮЩЕЙ ПАРТИИ 500
ВЫБОР СЛЕДУЮЩЕЙ ГРУППЫ 500 ПОСЛЕДОВАТЕЛЬНОСТИ СЛЕДУЮЩЕЙ ГРУППЫ 500 ВЫБОР СЛЕДУЮЩЕЙ ГРУППЫ 500 ВЫБОР СЛЕДУЮЩЕЙ ГРУППЫ 500 Выполнено, шаг 408800......

но из работника 2 (task=1) логи выглядят так:

2017-06-20 04: 51: 07.288600: I tenorflow / core / common_runtime / simple_placer.cc: 841] нули: (константные) / job: рабочий / реплика: 0 / task: 1 / gpu: 0 Adagrad / value: (Const): / job: ps / replica: 0 / task: 0 / cpu: 0
2017-06-20 04: 51: 07.288614: I tenorflow/core/common_runtime/simple_placer.cc:841] Adagrad/ значение: (Const)/job: пс / реплика: 0 / задача: 0 / процессор: 0 truncated_normal / stddev: (Const): / job: worker / replica: 0 / task: 1 / gpu: 0
2017-06-20 04: 51: 07.288639: I tenorflow / core / common_runtime / simple_placer.cc: 841] truncated_normal / stddev: (Const)/job: worker / replica: 0 / task: 1 / gpu: 0 truncated_normal / mean: (Const): / job: worker / replica: 0 / task: 1 / gpu: 0
2017-06-20 04: 51: 07.288664: I tenorflow/core/common_runtime/simple_placer.cc:841] truncated_normal/mean: (Const)/job:worker/replica:0/task:1/gpu:0 truncated_normal/shape: (Const): /job:worker/replica:0/task:1/gpu:0 2017-06-20 04:51:07.288689: I tenorflow / core / common_runtime / simple_placer.cc: 841] truncated_normal / shape: (Const) / работа: работник / реплика: 0 / задача: 1 / GPU: 0

Я ожидал похожие журналы от обоих рабочих. Пожалуйста, помогите мне понять это. Ждем вашей помощи.

1 ответ

Вам необходимо вручную выделить данные для каждого работника.

# Get the subset of data for this worker 

mnist = input_data.read_data_sets('/tmp/mnist_temp', one_hot=True)
num_old = mnist.train._num_examples
ids = list(range(task_index, mnist.train._num_examples, num_workers))
mnist.train._images = mnist.train._images[ids,]
mnist.train._labels = mnist.train._labels[ids,]
mnist.train._num_examples = mnist.train._images.shape[0]

print("subset of training examples ", mnist.train._num_examples,"/",num_old)
Другие вопросы по тегам