注意收发数据时,数据要先放到缓存重排然后再进行收发。
接收数据时,kcp会先把数据放到rcv_buf中进行缓存,注意rcv_buf中的数据是已经按sn排好序的,但是不一定完整连续,因为传输过程中可能有丢失,rcv_queue是从rcv_buf中拷贝出的连续完整的数据分片。用户调用ikcp_recv()时是从rcv_que中读取数据。还要注意一点,ikcp_recv中的len一定要是整个数据的长度,要一次性读完,这和recvfrom的size是一样的。
与接收过程类似,只不过顺序相反,用户调用ikcp_send()只是将数据放到了snd_buf中,不过会自动进行数据分片,使得数据大小不超过mtu。然后,kcp会通过流量控制和拥塞控制将数据放到snd_buf中再进行发送。
注意区分一下,不断调用ikcp_send(),数据会累积在snd_queue中,如果对方没有接收,对方的数据则会累积在对方的rcv_buf中。
说了这么多,其实kcp底层调用的,还是recvfrom()和sendto(),这一点不要忘记。
snd_buf中存放的是已发送但未收到对方ack的数据包和未发送但可发送的数据包,如果收到了ack,相应的数据就会从snd_buf中删除。如上图中,8包已收到ack,从snd_buf删除,9、10包已发送但未收到ack。这里提一下,kcp的确认机制有两种,一种是una,与tcp的ack类似,代表una之前的包全部收到;还有一种是单独ack,只对某一个单独的包进行确认。kcp优先检测una。
kcp的重传机制体现了其较于tcp而言,灵活的地方。一是超时重传时间,可以自己设置,tcp是超时一次就将RTO×2,kcp可以×1.5,缩短了重传等待时间;二是建立了快速重传机制,如果一个数据包之后的指定个数据包已收到ack,那么即便未到该包的rto,也立即重传。取3包未收到ack,而4包和5包已经收到了ack,如果指定的是跳过2个包就认为包丢失,那么此时3包就立即重传,无论是否经过了3包的RTO。此外,kcp还可以选择是否设置延迟ack,超时只重传丢失包,采用非退让流控等,这些都使kcp的传输效率得以提高。
KCP只是简单的算法实现,并没有涉及到任何的底层调用。我们只需要在UDP系统调用的时候,注册KCP回调函数,即可使用。所以可以将它理解为一个应用层协议。
KCP为了实现选择性重传(ARQ),会维护一个接收窗口(滑动窗口)。如果收到有序数据会将其放到接收队列,以待应用层消费。如果存在包丢失,会判断。超过设置的次数,会让其选择重传对应的包。
其实就是通过一个rcv_nxt(接收窗口当前偏移)来判断当前需要接受的数据包。如果收到的包在窗口范围,但是不是rcv_nxt。先保存,等包连续之后才会将连续的数据包放入到接受队列供应用层消费。
同样网络不好的情况,KCP也会实现拥塞控制,限制发送端的包。
首先在分析之前我们应该去github看一下使用方法。其实很简单,初始化kcp对象,然后实现回调函数,其实就是实现自己底层UDP系统调用。每次我们通过KCP发包的时候,他都会调用这个回调。
UDP收到包之后调用ikcp_input函数,即可。我们最终只需要通过ikcp_send和ikcp_recv收发数据。
在看代码前,先看看kcp数据包的结构,Segement。
struct IKCPSEG
{
struct IQUEUEHEAD node;
IUINT32 conv; //会话编号,两方一致才会通信
IUINT32 cmd; //指令类型,四种下面会说
IUINT32 frg; //分片编号 倒数第几个seg。主要就是用来合并一块被分段的数据。
IUINT32 wnd; //自己可用窗口大小
IUINT32 ts;
IUINT32 sn; //编号 确认编号或者报文编号
IUINT32 una; //代表编号前面的所有报都收到了的标志
IUINT32 len;
IUINT32 resendts; //重传的时间戳。超过当前时间重发这个包
IUINT32 rto; //超时重传时间,根据网络去定
IUINT32 fastack; //快速重传机制,记录被跳过的次数,超过次数进行快速重传
IUINT32 xmit; //重传次数
char data[1]; //数据内容
};
Kcp就是通过数据包的这些字段,实现稳定通信,针对不同的点可以去做优化。从上面的字段,也可以看出kcp借助UNA和ACK实现了选择性重传。
这个方法,首先会判断kcp流。并尝试将包追加到前一段,如果可能的话。否则进行分片传输。
if (len <= (int)kcp->mss) count = 1;
else count = (len + kcp->mss - 1) / kcp->mss;
if (count >= (int)IKCP_WND_RCV) return -2;
if (count == 0) count = 1;
// fragment
for (i = 0; i < count; i++) {
int size = len > (int)kcp->mss ? (int)kcp->mss : len;
seg = ikcp_segment_new(kcp, size);
assert(seg);
if (seg == NULL) {
return -2;
}
if (buffer && len > 0) {
memcpy(seg->data, buffer, size);
}
seg->len = size;
seg->frg = (kcp->stream == 0)? (count - i - 1) : 0;
iqueue_init(&seg->node);
iqueue_add_tail(&seg->node, &kcp->snd_queue);
kcp->nsnd_que++;
if (buffer) {
buffer += size;
}
len -= size;
}
return 0;
**上面的代码逻辑中count其实就是包的分片数。然后循环,创建segment,segment的数据结构主要就是保存了分片的数据包信息。**比如eg->frg保存当前分片的编号。完事之后调用iqueue_add_tail方法将segment传入到发送队列。这些方法通过宏定义实现。其实就是链表操作。队列是一个双向链表。逻辑很简单。
那么这一步之时将数据分片放入到队列。具体发送逻辑在哪实现呢,继续往下看。
我们看一下回调的逻辑,其实就是ikcp_output方法,这个方法会在ikcp_flush 中调用。也就是ikcp_output做的是最终的数据发送。那是如何驱动的呢?我先来看看这个方法。
1、这个方法首先发送ack。遍历所有ack。调用ikcp_output 方法发送。
count = kcp->ackcount;
for (i = 0; i < count; i++) {
size = (int)(ptr - buffer);
if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
ikcp_output(kcp, buffer, size);
ptr = buffer;
}
ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
ptr = ikcp_encode_seg(ptr, &seg);
}
kcp->ackcount = 0;
**2、判断当前是否需要进行窗口探测,因为如果窗口为0,是不能发数据,所以必须进行窗口探测才行。探测结束之后,如果需要,设置探测窗口时间。发送探测窗口的请求或者窗口恢复的请求。主要就是请求对端窗口大小,以及告知远端窗口大小。**完事之后将结果放入seg中。
if (kcp->rmt_wnd == 0) {
if (kcp->probe_wait == 0) {
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
}
else {
if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {
if (kcp->probe_wait < IKCP_PROBE_INIT)
kcp->probe_wait = IKCP_PROBE_INIT;
kcp->probe_wait += kcp->probe_wait / 2;
if (kcp->probe_wait > IKCP_PROBE_LIMIT)
kcp->probe_wait = IKCP_PROBE_LIMIT;
kcp->ts_probe = kcp->current + kcp->probe_wait;
kcp->probe |= IKCP_ASK_SEND;
}
}
} else {
kcp->ts_probe = 0;
kcp->probe_wait = 0;
}
3、计算本次发送可用的窗口大小,由多个因素决定,KCP有选择性配置。可以选择不结合流控窗口。
**4、将发送队列中的消息放到发送缓冲区,其实就是发送窗口。也就是说所有发送后的数据都会在这个缓存区。发送数据之前,还需要设置对应的重传次数和间隔。*这个逻辑就比较简单了,其实就从发送窗口队列拿出一个seg。然后设置对应的参数。并且更新缓冲队列。以及缓冲队列的大小。如果设置nodelay,重传时间有2 变成1.5。
while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
IKCPSEG *newseg;
if (iqueue_is_empty(&kcp->snd_queue)) break;
newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);
iqueue_del(&newseg->node);
iqueue_add_tail(&newseg->node, &kcp->snd_buf);
kcp->nsnd_que--;
kcp->nsnd_buf++;
newseg->conv = kcp->conv;
newseg->cmd = IKCP_CMD_PUSH;
newseg->wnd = seg.wnd;
newseg->ts = current;
newseg->sn = kcp->snd_nxt++;
newseg->una = kcp->rcv_nxt;
newseg->resendts = current;
newseg->rto = kcp->rx_rto;
newseg->fastack = 0;
newseg->xmit = 0;
}
5、遍历发送窗口队列。判断是否有需要发送的数据(包括重新传输的)。其实就是拿到对应的segment,然后根据信息进行逻辑判断是否需要重新传输。或者需要发送。判断结束之后进行重新传输。
逻辑也很简单:
其实lost和change是用来更新窗口大小的字段。并且两个更新算法不一样。
if (segment->xmit == 0) {
needsend = 1;
segment->xmit++;
segment->rto = kcp->rx_rto;
segment->resendts = current + segment->rto + rtomin;
}
else if (_itimediff(current, segment->resendts) >= 0) {
needsend = 1;
segment->xmit++;
kcp->xmit++;
if (kcp->nodelay == 0) {
segment->rto += kcp->rx_rto;
} else {
segment->rto += kcp->rx_rto / 2;
}
segment->resendts = current + segment->rto;
//记录包丢失
lost = 1;
}
else if (segment->fastack >= resent) {
if ((int)segment->xmit <= kcp->fastlimit ||
kcp->fastlimit <= 0) {
needsend = 1;
segment->xmit++;
segment->fastack = 0;
segment->resendts = current + segment->rto;
//用来标示发生了快速重传
change++;
}
}
基本上所有的快速重传和超时重传的逻辑都在这个方法中。如果出现超时重传(丢包),就会进入慢启动,拥塞窗口减半,滑动窗口变为1。如果发生了快速重传,也会更新拥塞窗口。具体算法可看代码。
其实就是在ikcp_update方法中就行调用,这个方法需要应用层反复调用,一般可以为10ms和100ms,时间将决定数据发送的实时性。也就是说他会定时刷新判断发送窗口队列的数据或者需要重传的数据,并通过底层UDP进行数据发送。这个方法没有什么逻辑。
void ikcp_update(ikcpcb *kcp, IUINT32 current)
{
IINT32 slap;
kcp->current = current;
if (kcp->updated == 0) {
kcp->updated = 1;
kcp->ts_flush = kcp->current;
}
slap = _itimediff(kcp->current, kcp->ts_flush);
if (slap >= 10000 || slap < -10000) {
kcp->ts_flush = kcp->current;
slap = 0;
}
if (slap >= 0) {
kcp->ts_flush += kcp->interval;
if (_itimediff(kcp->current, kcp->ts_flush) >= 0) {
kcp->ts_flush = kcp->current + kcp->interval;
}
ikcp_flush(kcp);
}
}
这个方法是在底层UDP接收到网络数据之后调用。其实就是解析对应的数据。在KCP中,主要有四种报文格式,ACK报文、数据报文、探测window报文、响应窗口报文四种。
**1、首先就是解析对应的头部数据,大概是24字节。**然后根据字段去调用ikcp_parse_una和ikcp_shrink_buf方法。前者是解析una,确定已经发出去的数据包,有哪些对方接收到了。如果收到了直接重接受窗口移除。后者是更新kcp的send_una。send_una代表之前的包已经确定收到。
data = ikcp_decode8u(data, &cmd);
data = ikcp_decode8u(data, &frg);
data = ikcp_decode16u(data, &wnd);
data = ikcp_decode32u(data, &ts);
data = ikcp_decode32u(data, &sn);
data = ikcp_decode32u(data, &una);
data = ikcp_decode32u(data, &len);
size -= IKCP_OVERHEAD;
kcp->rmt_wnd = wnd;
ikcp_parse_una(kcp, una);
ikcp_shrink_buf(kcp);
**2、如果是ACK指令,其实就是做了一些处理。**ikcp_update_ack主要就是更新kcp的一些参数,包括rtt以及rto, 首先ikcp_parse_ack方法主要就是根据sn,去移除发送队列中对应的segment。然后就是更新maxack以及时间,并且记录日志
if (cmd == IKCP_CMD_ACK) {
if (_itimediff(kcp->current, ts) >= 0) {
ikcp_update_ack(kcp, _itimediff(kcp->current, ts));
}
ikcp_parse_ack(kcp, sn);
//根据snd队列去更新una
ikcp_shrink_buf(kcp);
if (flag == 0) {
flag = 1;
maxack = sn;
latest_ts = ts;
} else {
if (_itimediff(sn, maxack) > 0) {
#ifndef IKCP_FASTACK_CONSERVE
//记录最大ACK
maxack = sn;
latest_ts = ts;
#else
if (_itimediff(ts, latest_ts) > 0) {
maxack = sn;
latest_ts = ts;
}
#endif
}
}
//打印日志
}
**3、如果收到的是数据包,这个逻辑其实很简单,就是检测数据,并将有效的数据放到接受队列,首先就是判断数据包是否有效,如果是,构造一个segment。将数据放入。,然后调用ikcp_parse_data方法。这个方法逻辑也比较简单,其实就是判断是否有效,如果已经被接收过的话,就丢弃,否则根据sn(编号)插入到接收队列。**如果是询问窗口大小的包。这个其实就做个标记,因为每个kcp的header都有win大小。剩下的操作就是根据网络状况更新拥塞以及窗口大小了。
else if (cmd == IKCP_CMD_PUSH) {
if (ikcp_canlog(kcp, IKCP_LOG_IN_DATA)) {
ikcp_log(kcp, IKCP_LOG_IN_DATA,
"input psh: sn=%lu ts=%lu", (unsigned long)sn, (unsigned long)ts);
}
if (_itimediff(sn, kcp->rcv_nxt + kcp->rcv_wnd) < 0) {
ikcp_ack_push(kcp, sn, ts);
if (_itimediff(sn, kcp->rcv_nxt) >= 0) {
seg = ikcp_segment_new(kcp, len);
seg->conv = conv;
seg->cmd = cmd;
seg->frg = frg;
seg->wnd = wnd;
seg->ts = ts;
seg->sn = sn;
seg->una = una;
seg->len = len;
if (len > 0) {
memcpy(seg->data, data, len);
}
ikcp_parse_data(kcp, seg);
}
}
}
丢包率高的网络环境下KCP的优点才会显示出来。如果不丢包,那么TCP和KCP的效率不会差别很大,可能就是少了连接建立/关闭而已。一般来讲,在公网上传输的都可以使用,特别是对实时性要求较高的程序,如LOL。
看了kcp的实现,其实发现和传输层的TCP差不多,只不过进行一下微调和可控。比如牺牲流控保证数据包的实时传输。或者加速重传等等。还有通过una和ack实现选择性重传。总的来说用于游戏帧同步或者数据实时传输领域还是有一定的优势。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_44918090/article/details/125155861
内容来源于网络,如有侵权,请联系作者删除!