如题,利用paddle提供的word2vec模型:https://github.com/PaddlePaddle/models/tree/develop/PaddleRec/word2vec,在MPI集群上分布式运行时,is_sparse(skip-gram构造网络参数)默认false时,可以正常运行,传入参数 is_sparse(=True)时不能训练,最后报rpc的错误:
(已设置FLAGS_rpc_deadline=5000000)
W0918 13:25:01.122030 4096 parallel_executor.cc:333] The number of graph should be only one, but the current graph has 2 sub_graphs. If you want to see the nodes of the sub_graphs, you should use 'FLAGS_print_sub_graph_dir' to specify the output dir. NOTES: if you not do training, please don't pass loss_var_name.
2019-09-18 13:25:01,169-INFO: running data in ./train_data/xab
F0918 13:27:53.938683 8432 grpc_client.cc:418] FetchBarrierRPC name:[FETCH_BARRIER@RECV], ep:[10.76.57.40:62001], status:[-1] meets grpc error, error_code:14 error_message:Socket closed error_details:
***Check failure stack trace:***
@ 0x7f811a7a0c0d google::LogMessage::Fail()
@ 0x7f811a7a46bc google::LogMessage::SendToLog()
@ 0x7f811a7a0733 google::LogMessage::Flush()
@ 0x7f811a7a5bce google::LogMessageFatal::~LogMessageFatal()
@ 0x7f811b3a5e0e paddle::operators::distributed::GRPCClient::Proceed()
@ 0x7f81288e68a0 execute_native_thread_routine
@ 0x7f8132cfc1c3 start_thread
@ 0x7f813232412d __clone
@ (nil) (unknown)
('corpus_size:', 269909100)
dict_size = 2699092 word_all_count = 269909100
CPU_NUM:5
2条答案
按热度按时间8ljdwjyq1#
Fixed by #19737
gfttwv5a2#
`fromfutureimport print_function
import argparse
import logging
import os
import time
import math
import random
import numpy as np
import paddle
import paddle.fluid as fluid
import six
import config
import reader
from net import skip_gram_word2vec
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
def parse_args():
parser = argparse.ArgumentParser(
description="PaddlePaddle Word2vec example")
parser.add_argument(
'--train_data_dir',
type=str,
default='./data/text',
help="The path of taining dataset")
parser.add_argument(
'--base_lr',
type=float,
default=0.01,
help="The number of learing rate (default: 0.01)")
parser.add_argument(
'--save_step',
type=int,
default=10000,
help="The number of step to save (default: 500000)")
parser.add_argument(
'--print_batch',
type=int,
default=1,
help="The number of print_batch (default: 10)")
parser.add_argument(
'--dict_path',
type=str,
default='./data/1-billion_dict',
help="The path of data dict")
parser.add_argument(
'--batch_size',
type=int,
default=1,
help="The size of mini-batch (default:500)")
parser.add_argument(
'--num_passes',
type=int,
default=1,
help="The number of passes to train (default: 10)")
parser.add_argument(
'--model_output_dir',
type=str,
default='models',
help='The path for model to store (default: models)')
parser.add_argument('--nce_num', type=int, default=5, help='nce_num')
parser.add_argument(
'--embedding_size',
type=int,
default=64,
help='sparse feature hashing space for index processing')
parser.add_argument('--is_mpi', type=int, default=0,
help='mpi train, default=0')
parser.add_argument('--is_continous_training', type=int, default=0,
help='continous train (default: 0)')
parser.add_argument(
'--is_sparse',
action='store_true',
required=False,
default=False,
help='embedding and nce will use sparse or not, (default: False)')
parser.add_argument(
'--with_speed',
action='store_true',
required=False,
default=True,
help='print speed or not , (default: False)')
return parser.parse_args()
def convert_python_to_tensor(weight, batch_size, sample_reader):
defreader():
cs = np.array(weight).cumsum()
result = [[], []]
for sample in sample_reader():
for i, fea in enumerate(sample):
result[i].append(fea)
if len(result[0]) == batch_size:
tensor_result = []
for tensor in result:
t = fluid.Tensor()
dat = np.array(tensor, dtype='int64')
if len(dat.shape) > 2:
dat = dat.reshape((dat.shape[0], dat.shape[2]))
elif len(dat.shape) == 1:
dat = dat.reshape((-1, 1))
t.set(dat, fluid.CPUPlace())
tensor_result.append(t)
tt = fluid.Tensor()
neg_array = cs.searchsorted(np.random.sample(args.nce_num))
neg_array = np.tile(neg_array, batch_size)
tt.set(
neg_array.reshape((batch_size, args.nce_num)),
fluid.CPUPlace())
tensor_result.append(tt)
yield tensor_result
result = [[], []]
def train_loop(args, exe, train_program, reader, py_reader, loss, trainer_id,
weight, model_output_dir):
def GetFileList(data_path):
return os.listdir(data_path)
def train(args):
ifname== 'main':
args = parse_args()
train(args)
`