- 論壇徽章:
- 0
|
請朋友們注意,文章幾天后會轉(zhuǎn)入基于IPV4內(nèi)核中的TCP的追蹤分析
http://blog.chinaunix.net/u2/64681/showart_1432551.html
這個專輯系列中,本文是連載為了保證朋友們及時得到第一手的學(xué)習(xí)資料,特此敬請朋友們繼續(xù)關(guān)注。如果您找不到以前曾經(jīng)瀏覽過的文章是因為我對文章進(jìn)行了歸類整理,所以請朋友們查看相關(guān)的專輯索引。謝謝關(guān)注。
前面一節(jié)中,我們分析了代碼中先從sock 的接收隊列中摘下一個數(shù)據(jù)包,我們將在后面介紹客戶端是如何將數(shù)據(jù)包掛入到sock的這個隊列的,在客戶端數(shù)據(jù)包到達(dá)服務(wù)器網(wǎng)卡后進(jìn)一步向上傳遞的過程與我們在前面所介紹的客戶端連接請求的三次握手過程是相同的只不過那里會根據(jù)目的不同會執(zhí)行不同的函數(shù)路線,我們放在后邊分析,這里我們先假設(shè)數(shù)據(jù)包已經(jīng)掛入到了服務(wù)器的接收隊列sk_receive_queue中了,所以摘下這個數(shù)據(jù)包后就要做一下檢測,我們繼續(xù)往下分析tcp_recvmsg()函數(shù)的代碼
if (copied >= target && !sk->sk_backlog.tail)
break;
if (copied) {
if (sk->sk_err ||
sk->sk_state == TCP_CLOSE ||
(sk->sk_shutdown & RCV_SHUTDOWN) ||
!timeo ||
signal_pending(current) ||
(flags & MSG_PEEK))
break;
} else {
if (sock_flag(sk, SOCK_DONE))
break;
if (sk->sk_err) {
copied = sock_error(sk);
break;
}
if (sk->sk_shutdown & RCV_SHUTDOWN)
break;
if (sk->sk_state == TCP_CLOSE) {
if (!sock_flag(sk, SOCK_DONE)) {
/* This occurs when user tries to read
* from never connected socket.
*/
copied = -ENOTCONN;
break;
}
break;
}
if (!timeo) {
copied = -EAGAIN;
break;
}
if (signal_pending(current)) {
copied = sock_intr_errno(timeo);
break;
}
}
tcp_cleanup_rbuf(sk, copied);
if (!sysctl_tcp_low_latency && tp->ucopy.task == user_recv) {
/* Install new reader */
if (!user_recv && !(flags & (MSG_TRUNC | MSG_PEEK))) {
user_recv = current;
tp->ucopy.task = user_recv;
tp->ucopy.iov = msg->msg_iov;
}
tp->ucopy.len = len;
BUG_TRAP(tp->copied_seq == tp->rcv_nxt ||
(flags & (MSG_PEEK | MSG_TRUNC)));
/* Ugly... If prequeue is not empty, we have to
* process it before releasing socket, otherwise
* order will be broken at second iteration.
* More elegant solution is required!!!
*
* Look: we have the following (pseudo)queues:
*
* 1. packets in flight
* 2. backlog
* 3. prequeue
* 4. receive_queue
*
* Each queue can be processed only if the next ones
* are empty. At this point we have empty receive_queue.
* But prequeue _can_ be not empty after 2nd iteration,
* when we jumped to start of loop because backlog
* processing added something to receive_queue.
* We cannot release_sock(), because backlog contains
* packets arrived _after_ prequeued ones.
*
* Shortly, algorithm is clear --- to process all
* the queues in order. We could make it more directly,
* requeueing packets from backlog to prequeue, if
* is not empty. It is more elegant, but eats cycles,
* unfortunately.
*/
if (!skb_queue_empty(&tp->ucopy.prequeue))
goto do_prequeue;
/* __ Set realtime policy in scheduler __ */
}
if (copied >= target) {
/* Do not sleep, just process backlog. */
release_sock(sk);
lock_sock(sk);
} else
sk_wait_data(sk, &timeo);
然后是檢測是否已經(jīng)超過了接收的數(shù)據(jù)長度要求,然后是對數(shù)據(jù)包進(jìn)行一些檢測,接著進(jìn)入函數(shù)tcp_cleanup_rbuf()
void tcp_cleanup_rbuf(struct sock *sk, int copied)
{
struct tcp_sock *tp = tcp_sk(sk);
int time_to_ack = 0;
#if TCP_DEBUG
struct sk_buff *skb = skb_peek(&sk->sk_receive_queue);
BUG_TRAP(!skb || before(tp->copied_seq, TCP_SKB_CB(skb)->end_seq));
#endif
if (inet_csk_ack_scheduled(sk)) {
const struct inet_connection_sock *icsk = inet_csk(sk);
/* Delayed ACKs frequently hit locked sockets during bulk
* receive. */
if (icsk->icsk_ack.blocked ||
/* Once-per-two-segments ACK was not sent by tcp_input.c */
tp->rcv_nxt - tp->rcv_wup > icsk->icsk_ack.rcv_mss ||
/*
* If this read emptied read buffer, we send ACK, if
* connection is not bidirectional, user drained
* receive buffer and there was a small segment
* in queue.
*/
(copied > 0 &&
((icsk->icsk_ack.pending & ICSK_ACK_PUSHED2) ||
((icsk->icsk_ack.pending & ICSK_ACK_PUSHED) &&
!icsk->icsk_ack.pingpong)) &&
!atomic_read(&sk->sk_rmem_alloc)))
time_to_ack = 1;
}
/* We send an ACK if we can now advertise a non-zero window
* which has been raised "significantly".
*
* Even if window raised up to infinity, do not send window open ACK
* in states, where we will not receive more. It is useless.
*/
if (copied > 0 && !time_to_ack && !(sk->sk_shutdown & RCV_SHUTDOWN)) {
__u32 rcv_window_now = tcp_receive_window(tp);
/* Optimize, __tcp_select_window() is not cheap. */
if (2*rcv_window_now = tp->window_clamp) {
__u32 new_window = __tcp_select_window(sk);
/* Send ACK now, if this read freed lots of space
* in our buffer. Certainly, new_window is new window.
* We can advertise it now, if it is not less than current one.
* "Lots" means "at least twice" here.
*/
if (new_window && new_window >= 2 * rcv_window_now)
time_to_ack = 1;
}
}
if (time_to_ack)
tcp_send_ack(sk);
}
這個函數(shù)中再次從接收隊列中取得數(shù)據(jù)包的結(jié)構(gòu)體,然后檢查一下是否對這個數(shù)據(jù)包需要進(jìn)行ack回復(fù),最終會通過 tcp_send_ack()函數(shù)向客戶端的socket發(fā)回一個ack數(shù)據(jù)包,tcp_send_ack()函數(shù)的過程我們在
http://blog.chinaunix.net/u2/64681/showart.php?id=1415963
那節(jié)中看到了他是最后調(diào)用tcp_transmit_skb()函數(shù)完成的。我們接著分析上邊已經(jīng)列出的tcp_recvmsg()函數(shù)的代碼,代碼中將當(dāng)前進(jìn)程的結(jié)構(gòu)對socket進(jìn)程結(jié)構(gòu)進(jìn)行了賦值,以及建立為上層socket所使用緩沖的指針數(shù)組iovec結(jié)構(gòu)體。如果服務(wù)器端sock中的預(yù)處理隊列不是空的話還要進(jìn)行一下預(yù)處理通過tcp_prequeue_process()函數(shù)來實現(xiàn)。
static void tcp_prequeue_process(struct sock *sk)
{
struct sk_buff *skb;
struct tcp_sock *tp = tcp_sk(sk);
NET_INC_STATS_USER(LINUX_MIB_TCPPREQUEUED);
/* RX process wants to run with disabled BHs, though it is not
* necessary */
local_bh_disable();
while ((skb = __skb_dequeue(&tp->ucopy.prequeue)) != NULL)
sk->sk_backlog_rcv(sk, skb);
local_bh_enable();
/* Clear memory counter. */
tp->ucopy.memory = 0;
}
這個函數(shù)要通過sock結(jié)構(gòu)中的sk_backlog_rcv鉤子函數(shù)來執(zhí)行,我們在以前的服務(wù)器端的sock創(chuàng)建章節(jié)中看到了
http://blog.chinaunix.net/u2/64681/showart.php?id=1360583
sk->sk_backlog_rcv = sk->sk_prot->backlog_rcv;
也就是通過sk_prot這個鉤子結(jié)構(gòu)轉(zhuǎn)掛入的,那么我再回憶一下
http://blog.chinaunix.net/u2/64681/showart.php?id=1360583
那章節(jié)中是設(shè)置sk_prot的鉤入的是tcp_prot結(jié)構(gòu),我們上一節(jié)也提到過個結(jié)構(gòu)變量,我們看其相關(guān)部分
struct proto tcp_prot = {
。。。。。。
.backlog_rcv = tcp_v4_do_rcv,
。。。。。。
}
至于tcp_v4_do_rcv ()函數(shù)我們在
http://blog.chinaunix.net/u2/64681/showart.php?id=1656780
第16節(jié)中已經(jīng)分析了,這也說明預(yù)備隊列的作用也是用于接收數(shù)據(jù)包所使用的。我們接著看tcp_recvmsg()上面的代碼,如果接收到的數(shù)據(jù)超過了要求的話,就要調(diào)用release_sock()函數(shù)
void release_sock(struct sock *sk)
{
/*
* The sk_lock has mutex_unlock() semantics:
*/
mutex_release(&sk->sk_lock.dep_map, 1, _RET_IP_);
spin_lock_bh(&sk->sk_lock.slock);
if (sk->sk_backlog.tail)
__release_sock(sk);
sk->sk_lock.owned = 0;
if (waitqueue_active(&sk->sk_lock.wq))
wake_up(&sk->sk_lock.wq);
spin_unlock_bh(&sk->sk_lock.slock);
}
首先是加鎖保護(hù),然后我們看他檢查一下backlog隊列,執(zhí)行了__release_sock()函數(shù)
static void __release_sock(struct sock *sk)
{
struct sk_buff *skb = sk->sk_backlog.head;
do {
sk->sk_backlog.head = sk->sk_backlog.tail = NULL;
bh_unlock_sock(sk);
do {
struct sk_buff *next = skb->next;
skb->next = NULL;
sk->sk_backlog_rcv(sk, skb);
/*
* We are in process context here with softirqs
* disabled, use cond_resched_softirq() to preempt.
* This is safe to do because we've taken the backlog
* queue private:
*/
cond_resched_softirq();
skb = next;
} while (skb != NULL);
bh_lock_sock(sk);
} while ((skb = sk->sk_backlog.head) != NULL);
}
在這個函數(shù)中我們看到他也是最終調(diào)用了上面已經(jīng)分析的tcp_v4_do_rcv()的函數(shù)將后備隊列的數(shù)據(jù)包掛入到接收隊列sk_receive_queue,這個過程我們以前講述過了,所以不看了,但是在上面判斷如果接收的數(shù)據(jù)沒有超過要求的話,則會進(jìn)入sk_wait_data()函數(shù)
int sk_wait_data(struct sock *sk, long *timeo)
{
int rc;
DEFINE_WAIT(wait);
prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
set_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
rc = sk_wait_event(sk, timeo, !skb_queue_empty(&sk->sk_receive_queue));
clear_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags);
finish_wait(sk->sk_sleep, &wait);
return rc;
}
在這個函數(shù)中我們看到先是為當(dāng)前進(jìn)程建立一個等待隊列頭結(jié)構(gòu)然后鏈入到sock結(jié)構(gòu)中的等待隊列中sk_sleep中,然后進(jìn)入定時等待。如果等待的時候已經(jīng)到了或者當(dāng)前這里進(jìn)程被喚醒了會再回到這里從sk_wait_data()函數(shù)中返回剩余的等待時間繼續(xù)往下執(zhí)行,關(guān)于等待隊列和定時操作的過程我們以后章節(jié)中描述,F(xiàn)在我們繼續(xù)往下看tcp_recvmsg()函數(shù)的代碼
#ifdef CONFIG_NET_DMA
tp->ucopy.wakeup = 0;
#endif
if (user_recv) {
int chunk;
/* __ Restore normal policy in scheduler __ */
if ((chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(LINUX_MIB_TCPDIRECTCOPYFROMBACKLOG, chunk);
len -= chunk;
copied += chunk;
}
if (tp->rcv_nxt == tp->copied_seq &&
!skb_queue_empty(&tp->ucopy.prequeue)) {
do_prequeue:
tcp_prequeue_process(sk);
if ((chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
len -= chunk;
copied += chunk;
}
}
}
if ((flags & MSG_PEEK) && peek_seq != tp->copied_seq) {
if (net_ratelimit())
printk(KERN_DEBUG "TCP(%s:%d): Application bug, race in MSG_PEEK.\n",
current->comm, task_pid_nr(current));
peek_seq = tp->copied_seq;
}
continue;
found_ok_skb:
/* Ok so how much can we use? */
used = skb->len - offset;
if (len used)
used = len;
/* Do we have urgent data here? */
if (tp->urg_data) {
u32 urg_offset = tp->urg_seq - *seq;
if (urg_offset used) {
if (!urg_offset) {
if (!sock_flag(sk, SOCK_URGINLINE)) {
++*seq;
offset++;
used--;
if (!used)
goto skip_copy;
}
} else
used = urg_offset;
}
}
if (!(flags & MSG_TRUNC)) {
#ifdef CONFIG_NET_DMA
if (!tp->ucopy.dma_chan && tp->ucopy.pinned_list)
tp->ucopy.dma_chan = get_softnet_dma();
if (tp->ucopy.dma_chan) {
tp->ucopy.dma_cookie = dma_skb_copy_datagram_iovec(
tp->ucopy.dma_chan, skb, offset,
msg->msg_iov, used,
tp->ucopy.pinned_list);
if (tp->ucopy.dma_cookie 0) {
printk(KERN_ALERT "dma_cookie );
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
if ((offset + used) == skb->len)
copied_early = 1;
} else
#endif
{
err = skb_copy_datagram_iovec(skb, offset,
msg->msg_iov, used);
if (err) {
/* Exception. Bailout! */
if (!copied)
copied = -EFAULT;
break;
}
}
}
*seq += used;
copied += used;
len -= used;
tcp_rcv_space_adjust(sk);
skip_copy:
if (tp->urg_data && after(tp->copied_seq, tp->urg_seq)) {
tp->urg_data = 0;
tcp_fast_path_check(sk);
}
if (used + offset skb->len)
continue;
if (tcp_hdr(skb)->fin)
goto found_fin_ok;
if (!(flags & MSG_PEEK)) {
sk_eat_skb(sk, skb, copied_early);
copied_early = 0;
}
continue;
found_fin_ok:
/* Process the FIN. */
++*seq;
if (!(flags & MSG_PEEK)) {
sk_eat_skb(sk, skb, copied_early);
copied_early = 0;
}
break;
} while (len > 0);
if (user_recv) {
if (!skb_queue_empty(&tp->ucopy.prequeue)) {
int chunk;
tp->ucopy.len = copied > 0 ? len : 0;
tcp_prequeue_process(sk);
if (copied > 0 && (chunk = len - tp->ucopy.len) != 0) {
NET_ADD_STATS_USER(LINUX_MIB_TCPDIRECTCOPYFROMPREQUEUE, chunk);
len -= chunk;
copied += chunk;
}
}
tp->ucopy.task = NULL;
tp->ucopy.len = 0;
}
#ifdef CONFIG_NET_DMA
if (tp->ucopy.dma_chan) {
dma_cookie_t done, used;
dma_async_memcpy_issue_pending(tp->ucopy.dma_chan);
while (dma_async_memcpy_complete(tp->ucopy.dma_chan,
tp->ucopy.dma_cookie, &done,
&used) == DMA_IN_PROGRESS) {
/* do partial cleanup of sk_async_wait_queue */
while ((skb = skb_peek(&sk->sk_async_wait_queue)) &&
(dma_async_is_complete(skb->dma_cookie, done,
used) == DMA_SUCCESS)) {
__skb_dequeue(&sk->sk_async_wait_queue);
kfree_skb(skb);
}
}
/* Safe to free early-copied skbs now */
__skb_queue_purge(&sk->sk_async_wait_queue);
dma_chan_put(tp->ucopy.dma_chan);
tp->ucopy.dma_chan = NULL;
}
if (tp->ucopy.pinned_list) {
dma_unpin_iovec_pages(tp->ucopy.pinned_list);
tp->ucopy.pinned_list = NULL;
}
#endif
/* According to UNIX98, msg_name/msg_namelen are ignored
* on connected socket. I was just happy when found this 8) --ANK
*/
/* Clean up data we have read: This will do ACK frames. */
tcp_cleanup_rbuf(sk, copied);
TCP_CHECK_TIMER(sk);
release_sock(sk);
return copied;
out:
TCP_CHECK_TIMER(sk);
release_sock(sk);
return err;
recv_urg:
err = tcp_recv_urg(sk, timeo, msg, len, flags, addr_len);
goto out;
}
我們把余下的代碼都貼出來了,在上面的代碼中首先是判斷是否已經(jīng)拷貝了一些數(shù)據(jù)就要調(diào)整“計數(shù)器” len和copied,接下來檢查是否所有的數(shù)據(jù)包是否都已經(jīng)處理完畢了,這里是通過判斷數(shù)據(jù)包的“序列號”來實現(xiàn)的tp->rcv_nxt == tp->copied_seq,如果都處理完接下來要檢查“預(yù)處理隊列”是否空,如果也有數(shù)據(jù)等待處理就執(zhí)行do_prequeue處的代碼,這部分我們在上面看到是執(zhí)行的tcp_prequeue_process()函數(shù),我們在上面看到過這個函數(shù)的代碼了,其實就是將預(yù)備隊列中的數(shù)據(jù)包轉(zhuǎn)入到sk_receive_queue隊列中在下一次的循環(huán)中接著處理。當(dāng)執(zhí)行到found_ok_skb標(biāo)號處時,首先是計算一下我們還有多少可用空間used,接著檢查 tcp的sock結(jié)構(gòu)中的標(biāo)志urg_data,再次確定一下可用的空間的大小。接下來我們看到它調(diào)用了skb_copy_datagram_iovec()函數(shù),我們再次注意msghdr結(jié)構(gòu)變量msg,這是用于socket更上層,即與進(jìn)程聯(lián)系用的緩沖區(qū),而msghdr結(jié)構(gòu)中的iovec則表示數(shù)據(jù)塊的地址,也就是我們的緩沖區(qū)。我們在
http://blog.chinaunix.net/u2/64681/showart.php?id=1333991
那篇文章中列出了skb_copy_datagram_iovec()函數(shù)的代碼,這個函數(shù)完成了將將sk_buff中的數(shù)據(jù)拷貝到我們?yōu)榻邮諟?zhǔn)備好的iovec指針隊列也可以稱為數(shù)組中了,我們說過iovec代表著緩沖區(qū)。當(dāng)完成拷貝后則會接著更新一下相應(yīng)的“計數(shù)器”后,進(jìn)入了tcp_rcv_space_adjust()函數(shù)中,這個函數(shù)在每次接收數(shù)據(jù)過程中都會用來調(diào)整tcp的sock緩沖空間大小。函數(shù)代碼都是關(guān)于一些計算方法,我們不看了,接下來函數(shù)最后執(zhí)行了tcp_cleanup_rbuf(),我們已經(jīng)將接收的數(shù)據(jù)拷貝給了用戶空間所使用的msghdr的緩沖結(jié)構(gòu)區(qū)中了
void tcp_cleanup_rbuf(struct sock *sk, int copied)
{
struct tcp_sock *tp = tcp_sk(sk);
int time_to_ack = 0;
#if TCP_DEBUG
struct sk_buff *skb = skb_peek(&sk->sk_receive_queue);
BUG_TRAP(!skb || before(tp->copied_seq, TCP_SKB_CB(skb)->end_seq));
#endif
if (inet_csk_ack_scheduled(sk)) {
const struct inet_connection_sock *icsk = inet_csk(sk);
/* Delayed ACKs frequently hit locked sockets during bulk
* receive. */
if (icsk->icsk_ack.blocked ||
/* Once-per-two-segments ACK was not sent by tcp_input.c */
tp->rcv_nxt - tp->rcv_wup > icsk->icsk_ack.rcv_mss ||
/*
* If this read emptied read buffer, we send ACK, if
* connection is not bidirectional, user drained
* receive buffer and there was a small segment
* in queue.
*/
(copied > 0 &&
((icsk->icsk_ack.pending & ICSK_ACK_PUSHED2) ||
((icsk->icsk_ack.pending & ICSK_ACK_PUSHED) &&
!icsk->icsk_ack.pingpong)) &&
!atomic_read(&sk->sk_rmem_alloc)))
time_to_ack = 1;
}
/* We send an ACK if we can now advertise a non-zero window
* which has been raised "significantly".
*
* Even if window raised up to infinity, do not send window open ACK
* in states, where we will not receive more. It is useless.
*/
if (copied > 0 && !time_to_ack && !(sk->sk_shutdown & RCV_SHUTDOWN)) {
__u32 rcv_window_now = tcp_receive_window(tp);
/* Optimize, __tcp_select_window() is not cheap. */
if (2*rcv_window_now = tp->window_clamp) {
__u32 new_window = __tcp_select_window(sk);
/* Send ACK now, if this read freed lots of space
* in our buffer. Certainly, new_window is new window.
* We can advertise it now, if it is not less than current one.
* "Lots" means "at least twice" here.
*/
if (new_window && new_window >= 2 * rcv_window_now)
time_to_ack = 1;
}
}
if (time_to_ack)
tcp_send_ack(sk);
}
這個函數(shù)是調(diào)用
static inline int inet_csk_ack_scheduled(const struct sock *sk)
{
return inet_csk(sk)->icsk_ack.pending & ICSK_ACK_SCHED;
}
這個宏檢查一下是否還要向客戶端發(fā)送一個ack,并且調(diào)整一下接收的“窗口”,最后根據(jù)確定是否ack,來調(diào)用tcp_send_ack()向客戶端發(fā)送ack,應(yīng)答數(shù)據(jù)包。這個函數(shù)我們在
http://blog.chinaunix.net/u2/64681/showart.php?id=1662181
那節(jié)中簡單描述了,它不但為ack分配一個新的數(shù)據(jù)包結(jié)構(gòu)還會調(diào)用tcp_transmit_skb()將數(shù)據(jù)包發(fā)送給客戶端。這個函數(shù)在
http://blog.chinaunix.net/u2/64681/showart.php?id=1415963
那節(jié)中,tcp_recvmsg()函數(shù)的其余代碼我們暫且不看了,主要的部分我們已經(jīng)認(rèn)真分析了,如果朋友們對此接收過程還比較模糊不清的話請再回到我們在
http://blog.chinaunix.net/u2/64681/showart.php?id=1351306
那節(jié)和
http://blog.chinaunix.net/u2/64681/showart.php?id=1333991
二節(jié)中探討的關(guān)于unix的af_inet協(xié)議的udp和tcp的接收過程,那里描述更加詳細(xì)、淺顯易懂是本篇文章的基礎(chǔ)。
本文來自ChinaUnix博客,如果查看原文請點:http://blog.chinaunix.net/u2/64681/showart_1671640.html |
|