Paddle 分布式训练报var not in this block错误

rfbsl7qr  于 2021-11-30  发布在  Java
关注(0)|答案(8)|浏览(436)

我根据paddle上一个分布式训练的sample (https://www.paddlepaddle.org.cn/documentation/docs/zh/1.5/user_guides/howto/training/cluster_quick_start.html) 改写了自己的一个训练。其中一个trainer的代码如下,运行时报错:“ValueError: var image not in this block”,不知道是什么原因造成的?怎么解决啊?多谢!

import os
import time

import matplotlib
import numpy as np
import paddle
import paddle.fluid as fluid

from reader_creator import reader_creator

matplotlib.use('agg')

y_dim = 40
z_dim = 100
img_dir = './att_data'
list_file = './data/att_data/trainer.list'
output_dir = './out'
img_size = 64
crop_type = 'Centor'
crop_size=64

batch_size = 4
attack_target = 1
fake_target = 3
EPOCH_NUM = 200
USE_GPU = False

def get_params(program, prefix):
    all_params = program.global_block().all_parameters()
    return [t.name for t in all_params if t.name.startswith(prefix)]

def Discriminator(images, name='D'):
    with fluid.unique_name.guard(name + '/'):
        y = fluid.layers.reshape(x=images, shape=[-1, 1, img_size, img_size])

        # 第一层卷积池化层
        y = fluid.nets.simple_img_conv_pool(input=y, filter_size=(5, 5), num_filters=32, pool_size=(3, 3),
                                               pool_stride=(3, 3), act='tanh')
        # 第二层卷积池化层
        y = fluid.nets.simple_img_conv_pool(input=y, filter_size=(5, 5), num_filters=64, pool_size=(2, 2),
                                            pool_stride=(2, 2), act='tanh')
        # 第三层卷积池化层
        y = fluid.nets.simple_img_conv_pool(input=y, filter_size=(5, 5), num_filters=128, pool_size=(2, 2),
                                            pool_stride=(2, 2), act='tanh')
        # Densely Connected Layer
        y = fluid.layers.reshape(x=y, shape=[-1, 128 * 4])
        y = fluid.layers.fc(input=y, size=400, act='tanh')

        # Readout Layer
        y = fluid.layers.fc(input=y, size=40, act='softmax')
    return y

train_d_real = fluid.Program()
startup = fluid.Program()

with fluid.program_guard(train_d_real, startup):
    real_image = fluid.layers.data('image', shape=[1, img_size, img_size])
    label = fluid.layers.data(
        name='label', shape=[-1, y_dim], dtype='float32')
    p_real = Discriminator(real_image)
    real_cost = fluid.layers.sigmoid_cross_entropy_with_logits(p_real, label)
    real_avg_cost = fluid.layers.mean(real_cost)
    d_params = get_params(train_d_real, 'D')
    optimizer = fluid.optimizer.AdamOptimizer(learning_rate=2e-4)
    optimizer.minimize(real_avg_cost, parameter_list=d_params)

reader_cre = reader_creator(image_dir=img_dir, list_filename=list_file)
reader = reader_creator.make_reader(reader_cre, image_size=img_size,
                                    crop_type=crop_type, crop_size=crop_size, return_label=True)
face_generator = paddle.batch(paddle.reader.shuffle(reader, 30000), batch_size=2*batch_size)

def train():
    place = fluid.CPUPlace()
    if USE_GPU:
        place = fluid.CUDAPlace(0)
    exe = fluid.Executor(place)

    # fetch distributed training environment setting
    training_role = os.getenv("PADDLE_TRAINING_ROLE", "TRAINER")
    port = os.getenv("PADDLE_PSERVER_PORT", "6666")
    pserver_ips = os.getenv("PADDLE_PSERVER_IPS", "127.0.0.1")
    trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
    eplist = []
    for ip in pserver_ips.split(","):
        eplist.append(':'.join([ip, port]))
    pserver_endpoints = ",".join(eplist)
    trainers = int(os.getenv("PADDLE_TRAINERS", "2"))
    current_endpoint = os.getenv("PADDLE_CURRENT_IP", "127.0.0.1") + ":" + port

    t = fluid.DistributeTranspiler()
    t.transpile(
        trainer_id=trainer_id,
        pservers=pserver_endpoints,
        trainers=trainers,
        sync_mode=True,
        startup_program=startup
    )

    if training_role == "PSERVER":
        pserver_prog = t.get_pserver_program(current_endpoint)
        startup_prog = t.get_startup_program(current_endpoint, pserver_prog)
        exe.run(startup_prog)
        exe.run(pserver_prog)
    elif training_role == "TRAINER":
        trainer_prog = t.get_trainer_program()
        exe.run(fluid.default_startup_program())

        for pass_id in range(EPOCH_NUM):
            start_time = time.time()
            for i, batch_data in enumerate(face_generator()):
                if len(batch_data) != 2*batch_size:
                    print('len(batch_data)=%d, batch_size=%d' % (len(batch_data), batch_size))
                    continue
                data = []
                for index, ele in enumerate(batch_data):
                    if index % trainers == trainer_id:
                        data.append(ele)
                real_image = np.array(list(map(lambda x: x[0], data))).reshape(
                    -1, 64 * 64).astype('float32')
                real_batch_labels = np.array(list(map(lambda x: x[1], data))).reshape(-1, 1).astype('float32')
                real_batch_labels_size = np.size(real_batch_labels, axis=0)  # 每个batch中包含的label的个数
                assert real_batch_labels_size == batch_size, 'real_batch_labels_size != batch_size'
                real_labels = np.zeros(shape=[real_batch_labels_size, y_dim], dtype='float32')
                for i in range(real_batch_labels_size):
                    real_labels[i][int(real_batch_labels[i][0])] = 1.0

                r_real = exe.run(program=trainer_prog, fetch_list=[real_avg_cost],
                                 feed={'image': np.array(real_image), 'label': real_labels})

            print("Pass:%d,real_acg_cost:%f," % (pass_id, r_real[0][0]))

            end_time = time.time()
            one_pass_time = end_time - start_time
            print("This pass has taken %fs" % (one_pass_time))
        # destory the resource of current trainer node in pserver server node
        exe.close()
    else:
        raise AssertionError("PADDLE_TRAINING_ROLE should be one of [TRAINER, PSERVER]")

train()
gev0vcfq

gev0vcfq1#

with fluid.program_guard

去掉?

qgzx9mmu

qgzx9mmu2#

去掉with fluid.program_guard是可以跑,但是似乎没有跑出来预期结果。关于paddle的分布式训练,我想请教几个问题:

1.Pserver和trainer的代码(除了training_role等参数的设置部分)必须保持相同吗?
1.假如我这里有一个场景,要求是一个Pserver:S和两个trainer:A和B,这两个trainer的训练过程不是完全一样的。具体来说就是,A和B都会有训练一个共同网络N的过程,除此之外,B在训练N的同时,也会使用N的参数,自己额外再训练一个网络N_b。这种情况下,能不能使用paddle的分布式训练完成?如果可以,具体S、A和B的代码逻辑大概该如何设计和安排(比如S和A要一样吗?S和B又是什么关系)?
1.关于分布式训练,有没有更多可以参考的文档说明或代码示例?我自己找了一段时间,没找到多少。如果能有关于DistributeTranspiler.get_pserver_program和get_trainer_program的原理(如何实现一份代码划分为server和trainer两个角色)的说明就更好了!

多谢!

w9apscun

w9apscun3#

如果不去掉with fluid.program_guard的话,请把train_d_real传给transpile的program参数。否则,transpile默认调用的是fluid.default_main_program.

e37o9pze

e37o9pze4#

“把train_d_real传给transpile的program参数”,就是相当于指定此时的main_program就是transpile的program参数代表的program吗?

还有,在我描述的问题2里面,假如trainer_A里面有with fluid.program_guard(pro_1)和with fluid.program_guard(pro_2)两个语句块,那此时我应该传哪个pro给transpile的program参数?

2jcobegt

2jcobegt5#

传递需要和pserver通信的prog,也就是问题2里面的网络N对应的program。

jhkqcmku

jhkqcmku6#

那如果prog_1和prog_2都需要和pserver通信呢?

此外,假如另一个trainer B有prog_1和prog_2(这两个prog是和trainer A里的是一样的),但是B还有自己单独的prog_3,此时trainer B的transpile的program参数应该写哪个啊?

有两个方面的问题,辛苦都解答下吧,谢谢

r1zhe5dt

r1zhe5dt7#

那如果prog_1和prog_2都需要和pserver通信呢: 是指prog_1和prog_2都和同一个pserver通信?还是分别和各自的ps通信。

如果有多个program的情况,需要用program guard把需要transpile的program包起来。

mcvgt66p

mcvgt66p8#

  1. 是指prog_1和prog_2都和同一个pserver通信。
  2. 就像1中所述,两个prog都需要和pserver通信,用program guard包起来之后,transpile的program参数应该是哪个prog?

相关问题