Paddle MPI上分布式运行paddle官方的word2vec,is_sparse设置false时正常运行,is_spare设置true时不能训练

eivgtgni  于 2021-11-30  发布在  Java
关注(0)|答案(2)|浏览(258)

如题,利用paddle提供的word2vec模型:https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/word2vec,在MPI集群上分布式运行时,is_sparse(skip-gram构造网络参数)默认false时,可以正常运行,传入参数 is_sparse(=True)时不能训练,最后报rpc的错误:
(已设置FLAGS_rpc_deadline=5000000)

W0918 13:25:01.122030  4096 parallel_executor.cc:333] The number of graph should be only one, but the current graph has 2 sub_graphs. If you want to see the nodes of the sub_graphs, you should use 'FLAGS_print_sub_graph_dir' to specify the output dir. NOTES: if you not do training, please don't pass loss_var_name.
2019-09-18 13:25:01,169-INFO: running data in ./train_data/xab
F0918 13:27:53.938683  8432 grpc_client.cc:418] FetchBarrierRPC name:[FETCH_BARRIER@RECV], ep:[10.76.57.40:62001], status:[-1] meets grpc error, error_code:14 error_message:Socket closed error_details:

***Check failure stack trace:***

    @     0x7f811a7a0c0d  google::LogMessage::Fail()
    @     0x7f811a7a46bc  google::LogMessage::SendToLog()
    @     0x7f811a7a0733  google::LogMessage::Flush()
    @     0x7f811a7a5bce  google::LogMessageFatal::~LogMessageFatal()
    @     0x7f811b3a5e0e  paddle::operators::distributed::GRPCClient::Proceed()
    @     0x7f81288e68a0  execute_native_thread_routine
    @     0x7f8132cfc1c3  start_thread
    @     0x7f813232412d  __clone
    @              (nil)  (unknown)
('corpus_size:', 269909100)
dict_size = 2699092 word_all_count = 269909100
CPU_NUM:5
gfttwv5a

gfttwv5a2#

`fromfutureimport print_function
import argparse
import logging
import os
import time
import math
import random
import numpy as np
import paddle
import paddle.fluid as fluid
import six

import config

import reader
from net import skip_gram_word2vec

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)

def parse_args():
parser = argparse.ArgumentParser(
description="PaddlePaddle Word2vec example")
parser.add_argument(
'--train_data_dir',
type=str,
default='./data/text',
help="The path of taining dataset")
parser.add_argument(
'--base_lr',
type=float,
default=0.01,
help="The number of learing rate (default: 0.01)")
parser.add_argument(
'--save_step',
type=int,
default=10000,
help="The number of step to save (default: 500000)")
parser.add_argument(
'--print_batch',
type=int,
default=1,
help="The number of print_batch (default: 10)")
parser.add_argument(
'--dict_path',
type=str,
default='./data/1-billion_dict',
help="The path of data dict")
parser.add_argument(
'--batch_size',
type=int,
default=1,
help="The size of mini-batch (default:500)")
parser.add_argument(
'--num_passes',
type=int,
default=1,
help="The number of passes to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store (default: models)')
parser.add_argument('--nce_num', type=int, default=5, help='nce_num')
parser.add_argument(
'--embedding_size',
type=int,
default=64,
help='sparse feature hashing space for index processing')
parser.add_argument('--is_mpi', type=int, default=0,
help='mpi train, default=0')
parser.add_argument('--is_continous_training', type=int, default=0,
help='continous train (default: 0)')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding and nce will use sparse or not, (default: False)')
parser.add_argument(
'--with_speed',
action='store_true',
required=False,
default=True,
help='print speed or not , (default: False)')
return parser.parse_args()

def convert_python_to_tensor(weight, batch_size, sample_reader):
defreader():
cs = np.array(weight).cumsum()
result = [[], []]
for sample in sample_reader():
for i, fea in enumerate(sample):
result[i].append(fea)
if len(result[0]) == batch_size:
tensor_result = []
for tensor in result:
t = fluid.Tensor()
dat = np.array(tensor, dtype='int64')
if len(dat.shape) > 2:
dat = dat.reshape((dat.shape[0], dat.shape[2]))
elif len(dat.shape) == 1:
dat = dat.reshape((-1, 1))
t.set(dat, fluid.CPUPlace())
tensor_result.append(t)
tt = fluid.Tensor()
neg_array = cs.searchsorted(np.random.sample(args.nce_num))
neg_array = np.tile(neg_array, batch_size)
tt.set(
neg_array.reshape((batch_size, args.nce_num)),
fluid.CPUPlace())
tensor_result.append(tt)
yield tensor_result
result = [[], []]

return __reader__

def train_loop(args, exe, train_program, reader, py_reader, loss, trainer_id,
weight, model_output_dir):

py_reader.decorate_tensor_provider(
    convert_python_to_tensor(weight, args.batch_size, reader.train()))

# place = fluid.CPUPlace()

# exe = fluid.Executor(place)

exe.run(fluid.default_startup_program())

exec_strategy = fluid.ExecutionStrategy()
exec_strategy.use_experimental_executor = True

print("CPU_NUM:" + str(os.getenv("CPU_NUM")))
exec_strategy.num_threads = int(os.getenv("CPU_NUM"))

build_strategy = fluid.BuildStrategy()
if int(os.getenv("CPU_NUM")) > 1:
    build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce

train_exe = fluid.ParallelExecutor(
    use_cuda=False,
    loss_name=loss.name,
    main_program=train_program,
    build_strategy=build_strategy,
    exec_strategy=exec_strategy)

for pass_id in range(args.num_passes):
    py_reader.start()
    time.sleep(10)
    epoch_start = time.time()
    batch_id = 0
    start = time.time()
    try:
        end = time.time()
        while True:
            loss_val = train_exe.run(fetch_list=[loss.name])
            loss_val = np.mean(loss_val)
            batch_time = time.time() - end
            if batch_id % args.print_batch == 0:
                logger.info(
                    "TRAIN --> pass: {} batch: {} loss: {} reader queue:{}".
                    format(pass_id, batch_id,
                           loss_val.mean(), py_reader.queue.size(), batch_time))

            if args.with_speed:
                if batch_id % 100 == 0 and batch_id != 0:
                    elapsed = (time.time() - start)
                    start = time.time()
                    samples = 1001 * args.batch_size * int(
                        os.getenv("CPU_NUM"))
                    logger.info("Time used: {}, Samples/Sec: {}".format(
                        elapsed, samples / elapsed))

            if batch_id % args.save_step == 0 and batch_id != 0:
                model_dir = model_output_dir + '/pass-' + str(
                    pass_id) + ('/batch-' + str(batch_id))
                if trainer_id == 0:
                    # fluid.io.save_params(executor=exe, dirname=model_dir)
                    fluid.io.save_persistables(executor=exe, dirname=model_dir,
                                               main_program=train_program)
                    print("model saved in %s" % model_dir)
            batch_id += 1

    except fluid.core.EOFException:
        py_reader.reset()
        epoch_end = time.time()
        logger.info("Epoch: {0}, Train total expend: {1} ".format(
            pass_id, epoch_end - epoch_start))
        model_dir = args.model_output_dir + '/pass-' + str(pass_id)
        if trainer_id == 0:
            fluid.io.save_params(executor=exe, dirname=model_dir)
            print("model saved in %s" % model_dir)

def GetFileList(data_path):
return os.listdir(data_path)

def train(args):

if args.is_mpi:
    train_path = "./train_data"
    dict_path = "thirdparty/node_dict"
    # test_path = "./test_data"
    # model_path = "./thirdparty/models"
    output_path = "./output"
else:
    # model_path = config.local_model_path
    # test_path = config.local_test_path
    dict_path = args.dict_path
    train_path = args.train_data_dir
    output_path = args.model_output_dir

if not os.path.isdir(train_path):
    os.mkdir(train_path)

filelist = GetFileList(train_path)
word2vec_reader = reader.Word2VecReader(dict_path, train_path,
                                        filelist, 0, 1)

logger.info("dict_size: {}".format(word2vec_reader.dict_size))
np_power = np.power(np.array(word2vec_reader.id_frequencys), 0.75)
id_frequencys_pow = np_power / np_power.sum()

loss, py_reader = skip_gram_word2vec(
    word2vec_reader.dict_size,
    args.embedding_size,
    is_sparse=args.is_sparse,
    neg_num=args.nce_num)

optimizer = fluid.optimizer.SGD(
    learning_rate=fluid.layers.exponential_decay(
        learning_rate=args.base_lr,
        decay_steps=100000,
        decay_rate=0.999,
        staircase=True))

optimizer.minimize(loss)
place = fluid.CPUPlace()
exe = fluid.Executor(place)

if not args.is_mpi:

# do local training

    logger.info("run local training")
    main_program = fluid.default_main_program()
    train_loop(args, exe, main_program, word2vec_reader, py_reader, loss, 0,
               id_frequencys_pow, output_path)

else:

    logger.info("run mpi training")
    training_role = os.getenv("TRAINING_ROLE", "TRAINER")
    port = os.getenv("PADDLE_PORT", "6174")
    pserver_ips = os.getenv("PADDLE_PSERVERS")  # ip,ip...
    eplist = []
    for ip in pserver_ips.split(","):
        eplist.append(':'.join([ip, port]))
    pserver_endpoints = ",".join(eplist)  # ip:port,ip:port...
    trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
    current_endpoint = os.getenv("POD_IP") + ":" + port
    trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
    t = fluid.DistributeTranspiler()
    t.transpile(trainer_id, pservers=pserver_endpoints, current_endpoint=current_endpoint, trainers=trainers)
    if training_role == "PSERVER":
        pserver_prog, pserver_startup = t.get_pserver_programs(
            current_endpoint)
        exe.run(pserver_startup)
        exe.run(pserver_prog)
    elif training_role == "TRAINER":
        train_loop(args, exe, t.get_trainer_program(), word2vec_reader, py_reader, loss, trainer_id,
                   id_frequencys_pow, output_path)

ifname== 'main':
args = parse_args()
train(args)
`

相关问题