无从下手,不知道服务器怎么配合才能运行起来,注意事项也无从知晓。默认协议,心跳机制,登录过程等基础问题无从下手
尝试去看源代码,运行sample 调试起来,全部靠猜的坚持到现在。调试了几个星期,发现自己连心跳的机制都还不清楚,可怕的是无从下手呀。
ux6nzvsh1#
stn(signaling transfer network)是mars的核心模块,它负责网络请求任务的处理。在该模块启动的时候会创建消息队列,客户端程序员调用API创建网络任务加入消息队列, 当stn线程发现任务队列有任务了就唤醒,然后 NetCore::StartTask (推荐断点。。)根据任务类型(task.channel_select )选择长链接还是短链接。
NetCore::StartTask
任务是网络请求的抽象,比如点击朋友圈获取数据这个网络请求是走哪个通道(长链/短链),优先级是多少,如果获取失败重试多少次等等。为了代码清晰,下面默认USE_LONG_LINK宏已配置)
以shortlink为例子,用户主动发送的消息基本都是短链接。 短链接由短链接管理器创建(ShortLinkTaskManager::StartTask),它会把任务(Task)包装成一个TaskProfile,TaskProfile会记录启动时间等。 然后放入任务列表(lst_cmd),后面 __RunOnStartTask 迭代处理这个任务列表:
__RunOnStartTask
void ShortLinkTaskManager::__RunOnStartTask() { std::list<TaskProfile>::iterator first = lst_cmd_.begin(); std::list<TaskProfile>::iterator last = lst_cmd_.end(); ... // 迭代处理任务 while (first != last) { std::list<TaskProfile>::iterator next = first; ++next; // 如果该任务已经有线程执行了,就处理下一个 if (first->running_id) { ++sent_count; first = next; continue; } //重试间隔 ... // 检查是否具有发送任务的权限 ... //雪崩检测 ... // 发送网络请求之前记录一些时间信息等 ... // 创建ShortLInk ShortLinkInterface* worker = ShortLinkChannelFactory::Create(MessageQueue::Handler2Queue(asyncreg_.Get()), net_source_, first->task, first->use_proxy); // 设置回调,这些回调会在合适的位置被mars主动调用,具体位置参见后面 worker->OnSend.set(boost::bind(&ShortLinkTaskManager::__OnSend, this, _1), AYNC_HANDLER); worker->OnRecv.set(boost::bind(&ShortLinkTaskManager::__OnRecv, this, _1, _2, _3), AYNC_HANDLER); worker->OnResponse.set(boost::bind(&ShortLinkTaskManager::__OnResponse, this, _1, _2, _3, _4, _5, _6, _7), AYNC_HANDLER); first->running_id = (intptr_t)worker; // 创建ShortLink失败,则处理下一个任务 if (!first->running_id) { first = next; continue; } // 设置网络数据上报函数 worker->func_network_report.set(fun_notify_network_err_); // 发送请求 worker->SendRequest(bufreq, buffer_extension); // 然后处理下一个任务 ++sent_count; first = next; } }
在迭代过程中,对于每个网络任务,短链接工厂(ShortLinkChannelFactory::Create)创建一个短链接,它关联了一个线程,短链接的生命周期就是这个线程的生命周期。短链接线程先服务器握手( __RunConnect )然后发送请求,等待回复( __RunReadWrite )。有个问题是步骤比较固定,比如用户想发完请求打个日志,或者收到回复写一下数据库。这种情况下用户可以传入回调函数,mars会在合适的位置执行这些回调,一个比较完整的短链接生命周期如下:
__RunConnect
__RunReadWrite
线程启动 -> socket连接服务器 -> OnSend回调 -> 数据打包 socket发送请求 -> socket接受回复 -> OnRecv回调 -> 数据解包 -> OnResponse回调 -> 线程结束
最后,线程做什么我们知道了,可是线程还没有启动。翻翻上面代码, worker->SendRequest 会启动线程。
worker->SendRequest
长链接主要用于服务器向客户端推送的消息,比如一个不提醒的群聊收到群友的消息就会走长链接。前面套路都是一样的:stn获取消息队列的任务,net_core选择使用长链接处理任务,长链接管理器启动长链接任务。这一步短链接管理器是创建一个短链接,但是长链接管理器没有,它复用一条长链接。原因是mars目前只支持一条长链接和任意数量短链接。
长链接第一步也是服务器握手,第二步和短链接稍有不同;因为短链接是一次性任务,第二步开线程发个请求等回复就完了,长链接要求持续性的发送和接受任务,所以是一个whiletrue循环:
void LongLink::__RunReadWrite(SOCKET _sock, ErrCmdType& _errtype, int& _errcode, ConnectProfile& _profile) { // 空指令闹钟(心跳闹钟) Alarm alarmnoopinterval(boost::bind(&LongLink::__OnAlarm, this), false); // 超时闹钟(心跳超时闹钟) Alarm alarmnooptimeout(boost::bind(&LongLink::__OnAlarm, this), false); ... while (true) { // 如果空指令闹钟响过(OnAlarm调用过) if (!alarmnoopinterval.IsWaiting()) { if (first_noop_sent && alarmnoopinterval.Status() != Alarm::kOnAlarm) { xassert2(false, "noop interval alarm not running"); } if(first_noop_sent && alarmnoopinterval.Status() == Alarm::kOnAlarm) { __NotifySmartHeartbeatJudgeDozeStyle(); } xgroup2_define(noop_xlog); uint64_t last_noop_interval = alarmnoopinterval.After(); uint64_t last_noop_actual_interval = (alarmnoopinterval.Status() == Alarm::kOnAlarm) ? alarmnoopinterval.ElapseTime() : 0; bool has_late_toomuch = (last_noop_actual_interval >= (15*60*1000)); // 发送心跳,开启超时闹钟,确保心跳在限定时间内被发送&&收到答复 if (__NoopReq(noop_xlog, alarmnooptimeout, has_late_toomuch)) { nooping = true; __NotifySmartHeartbeatHeartReq(_profile, last_noop_interval, last_noop_actual_interval); } first_noop_sent = true; // 重置空指令闹钟 uint64_t noop_interval = __GetNextHeartbeatInterval(); alarmnoopinterval.Cancel(); alarmnoopinterval.Start((int)noop_interval); } // 错误判断 ... // 如果有待写的数据缓冲(lstsenddata_) if (sel.Write_FD_ISSET(_sock) && !lstsenddata_.empty()) { // 用writeev一次写完缓冲 ssize_t writelen = writev(_sock, vecwrite, (int)lstsenddata_.size()); ... // 重置心跳闹钟,这意味着假如之前有个心跳闹钟,但是现在与服务 // 器通信了,之前那个心跳闹钟就不算了 unsigned long long noop_interval = __GetNextHeartbeatInterval(); alarmnoopinterval.Cancel(); alarmnoopinterval.Start((int)noop_interval); GetSignalOnNetworkDataChange()(XLOGGER_TAG, writelen, 0); // 写完调用OnSend回调并清理待写缓冲(lstsenddata_) while (it != lstsenddata_.end() && 0 < writelen) { if (0 == it->second->Pos() && OnSend) OnSend(it->first.taskid); ... it = lstsenddata_.erase(it); } } lock.unlock(); // 如果socket可读 if (sel.Read_FD_ISSET(_sock)) { bufrecv.AllocWrite(64 * 1024, false); // 那就读 ssize_t recvlen = recv(_sock, bufrecv.PosPtr(), 64 * 1024, 0); ... // 解析读到的数据包 while (0 < bufrecv.Length()) { int unpackret = longlink_unpack(bufrecv, cmdid, taskid, packlen, body, extension, tracker_.get()); ... // 解析完就调用OnRecv回调 if (LONGLINK_UNPACK_STREAM_PACKAGE == unpackret) { if (OnRecv) OnRecv(taskid, packlen, packlen); } // 如果是心跳包顺便取消心跳超时闹钟,并调用OnResponse回调 else if (!__NoopResp(cmdid, taskid, stream_resp.stream, stream_resp.extension, alarmnooptimeout, nooping, _profile)) { if (OnResponse) OnResponse(kEctOK, 0, cmdid, taskid, stream_resp.stream, stream_resp.extension, _profile); sent_taskids.erase(taskid); } } } } End: // 错误处理,清理资源,退出等 ... }
Longlink做三件事:1)视情况发送心跳2)如果数据缓冲有数据就发送3)如果socket可读就读取。第一个心跳是通过空指令闹钟完成的,它每隔一段时间响一次,然后重置等待下一次响应,另外空指令闹钟响的时候会发送心跳包并开启超时闹钟(alarmnooptimeout),该闹钟用于确保心跳在超时范围内从客户端发送&&得到服务器响应。
同样,长链接完整流程是:
线程启动 -> socket连接服务器 -> 死循环{ 空指令闹钟响过 -> NoopReq发送心跳,开启超时闹钟 -> 重置空指令闹钟 socket可写 -> 写 -> OnSend回调 socket读 -> 读 -> 解包 -> OnRecv回调 -> 解包完毕 -> 如果是心跳包{ 取消超时闹钟 ,OnResponse回调 } }
kdfy810k2#
@kelthuzadx 非常感谢,我昨天看到了心跳机制明白了心跳机制,可以查看 #638 。
现在问题在长连接的鉴权了,我计划鉴权的逻辑为
获取响应
鉴权成功,进入正常逻辑
鉴权失败,提示用户token 已经失效,请重新登录登录使用的是http 完成
现在卡在这个问题上。
ikfrs5lh3#
参考 https://segmentfault.com/a/1190000008964528,写的很详细
5cnsuln74#
代码里心跳包不叫心跳,叫noop。。真是奇葩
4条答案
按热度按时间ux6nzvsh1#
我读了些长链短链的源码,希望对您有所帮助。。
mars::stn
stn(signaling transfer network)是mars的核心模块,它负责网络请求任务的处理。在该模块启动的时候会创建消息队列,客户端程序员调用API创建网络任务加入消息队列, 当stn线程发现任务队列有任务了就唤醒,然后
NetCore::StartTask
(推荐断点。。)根据任务类型(task.channel_select )选择长链接还是短链接。任务是网络请求的抽象,比如点击朋友圈获取数据这个网络请求是走哪个通道(长链/短链),优先级是多少,如果获取失败重试多少次等等。为了代码清晰,下面默认USE_LONG_LINK宏已配置)
短链接
以shortlink为例子,用户主动发送的消息基本都是短链接。 短链接由短链接管理器创建(ShortLinkTaskManager::StartTask),它会把任务(Task)包装成一个TaskProfile,TaskProfile会记录启动时间等。 然后放入任务列表(lst_cmd),后面
__RunOnStartTask
迭代处理这个任务列表:在迭代过程中,对于每个网络任务,短链接工厂(ShortLinkChannelFactory::Create)创建一个短链接,它关联了一个线程,短链接的生命周期就是这个线程的生命周期。短链接线程先服务器握手(
__RunConnect
)然后发送请求,等待回复(__RunReadWrite
)。有个问题是步骤比较固定,比如用户想发完请求打个日志,或者收到回复写一下数据库。这种情况下用户可以传入回调函数,mars会在合适的位置执行这些回调,一个比较完整的短链接生命周期如下:
最后,线程做什么我们知道了,可是线程还没有启动。翻翻上面代码,
worker->SendRequest
会启动线程。长链接
长链接主要用于服务器向客户端推送的消息,比如一个不提醒的群聊收到群友的消息就会走长链接。前面套路都是一样的:stn获取消息队列的任务,net_core选择使用长链接处理任务,长链接管理器启动长链接任务。这一步短链接管理器是创建一个短链接,但是长链接管理器没有,它复用一条长链接。原因是mars目前只支持一条长链接和任意数量短链接。
长链接第一步也是服务器握手,第二步和短链接稍有不同;因为短链接是一次性任务,第二步开线程发个请求等回复就完了,长链接要求持续性的发送和接受任务,所以是一个whiletrue循环:
Longlink做三件事:1)视情况发送心跳2)如果数据缓冲有数据就发送3)如果socket可读就读取。第一个心跳是通过空指令闹钟完成的,它每隔一段时间响一次,然后重置等待下一次响应,另外空指令闹钟响的时候会发送心跳包并开启超时闹钟(alarmnooptimeout),该闹钟用于确保心跳在超时范围内从客户端发送&&得到服务器响应。
同样,长链接完整流程是:
kdfy810k2#
@kelthuzadx 非常感谢,我昨天看到了心跳机制明白了心跳机制,可以查看 #638 。
现在问题在长连接的鉴权了,我计划鉴权的逻辑为
获取响应
鉴权成功,进入正常逻辑
鉴权失败,提示用户token 已经失效,请重新登录
登录使用的是http 完成
现在卡在这个问题上。
ikfrs5lh3#
参考 https://segmentfault.com/a/1190000008964528,写的很详细
5cnsuln74#
代码里心跳包不叫心跳,叫noop。。真是奇葩