numpy 如何使用tensorflow ParameterServerStrategy分布式训练来训练mnist数据?

lvmkulzt  于 12个月前  发布在  其他
关注(0)|答案(1)|浏览(78)

我正在尝试使用ParameterServerStrategy训练mnist数据集。作为一个初学者,我发现文档是混乱的,特别是当谈到“集群在真实的世界”一节。这是我正在跟踪的文档:https://colab.research.google.com/github/tensorflow/docs/blob/master/site/en/tutorials/distribute/parameter_server_training.ipynb#scrollTo=zyby6M2Jqg6J&uniqifier=1到目前为止,我有这个:

#this is mnist_setup.py

import os
import tensorflow as tf
import numpy as np

def mnist_dataset(batch_size):
  (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
  # The `x` arrays are in uint8 and have values in the [0, 255] range.
  # You need to convert them to float32 with values in the [0, 1] range.
  x_train = x_train / np.float32(255)
  y_train = y_train.astype(np.int64)
  train_dataset = tf.data.Dataset.from_tensor_slices(
      (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
  return train_dataset

def build_and_compile_model():
  model = tf.keras.Sequential([
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(10, activation=tf.nn.softmax)
  ])
  model.compile(
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      optimizer=tf.keras.optimizers.SGD(),
      metrics=['accuracy'])
  return model
#this is main.py

import os
import json

import tensorflow as tf
import mnist_setup

per_worker_batch_size = 64

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ['ip_of_deeplearning_VM:port'], #worker1
        "ps": ['ip_of_deeplearning_VM:port'], #worker2
        "chief": ['ip_of_deeplearning_VM:port'] #masterz
    },
    "task": {"type": "chief", "index": 0}
})

cluster_spec = tf.train.ClusterSpec({
    'ps':['ip_of_deeplearning_VM:port'], #worker2
    'worker': ['ip_of_deeplearning_VM:port'] #worker1
})

cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(cluster_spec, task_type="ps",task_id=0)

tf_config = json.loads(os.environ['TF_CONFIG'])
num_workers = len(tf_config['cluster']['worker'])

strategy = tf.distribute.experimental.ParameterServerStrategy(cluster_resolver)

global_batch_size = per_worker_batch_size * num_workers
multi_worker_dataset = mnist_setup.mnist_dataset(global_batch_size)

with strategy.scope():
  # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = mnist_setup.build_and_compile_model()
    
print("chief gets called!")
result = multi_worker_model.fit(multi_worker_dataset, epochs=3)

我将这些文件复制到worker和ps VM,更改索引并main.py同时在所有这些文件上运行“www.example.com“。我收到消息说服务器在ip_address启动,但仅此而已。有人能告诉我我需要做什么才能让这个工作吗?

sg3maiej

sg3maiej1#

服务器启动后,分布式策略尚未示例化。此代码需要在您配置工作和PS IP地址后添加。

variable_partitioner = (
    tf.distribute.experimental.partitioners.MinSizePartitioner(
        min_shard_bytes=(256 << 10),
        max_shards=NUM_PS))

strategy = tf.distribute.ParameterServerStrategy(
    cluster_resolver,
    variable_partitioner=variable_partitioner)

还有一个不匹配的指南,你有附加和代码编写。

model = tf.keras.models.Sequential([tf.keras.layers.Dense(10)])

一旦你示例化了一个GCP VM,SSH进入它,安装最新的CUDA/CUDNN依赖项并复制我提供的代码。
请将所有代码复制到一个文档中,并在终端中一次性使用。
我已经写了正确的代码中给出的要点here。你可以参考一下。我用的是Ubuntu,我用的是Ubuntu。
集群协调器预期在作业完成时终止PS进程,沿着通过重新生成进程来处理失败的工作进程。你将不得不PS辅助,然后手动杀死它。

相关问题