NDK开发套件的高效率设计
为了加速其高档DSP的网络化进程,TI公司结合其C6000系列芯片推出了TCP/IP NDK (Network Developer’s Kit)开发套件。
NDK主要的组件包括:(1)支持TCP/TP协议栈程序库。其中主要包含的库有:支持TCP/IP网络工具的库,支持TCP/IP协议栈与DSP/BIOS平台的库,网络控制以及线程调度的库(包括协议栈的初始化以及网络相关任务的调度)(2)示范程序。其中主要包括DHCP/Telnet客户端,HTTP/数据服务器示范等。(3)支持文档包括用户手册、程序员手册和平台适应手册。
NDK采用紧凑的设计方法,实现了用较少的资源耗费来支持TCP/IP。从实用效果看,NDK仅用200~250K程序空间和95K数据空间即可支持常规的TCP/IP服务,包括应用层的telnet、DHCP、HTTP等。为了最大限度地减少资源消耗,TI为其NDK采用了许多特殊的技巧,重要的有:(1)UDP socket和RAW socket不使用发送或接收缓冲区;(2)TCP socket使用发送缓冲区,接收缓冲区依配置文件而定;(3)低层驱动程序与协议栈之间通过指针传递数据,不对包进行复制拷贝;4、设置专门的线程清除存储器中的碎片和检查存储器泄露。因此,NDK很适合目前嵌入式系统的硬件环境,是实现DSP联网通信的重要支撑工具。
NDK的软件开发环境是TI的开发工具CCS(code composer studio)。它包含有实时操作系统DSP/BIOS和主机与目标板之间的实时数据交换软件RTDX。
2.2 NDK的配置和使用
在CCS下使用NDK需要在以下几点上做特别处理:
(1)设置DSP/BIOS
PRD设置主时钟。硬件抽象层的时钟驱动需要一个100ms启动一次的PRD函数作为主时钟,函数名是llTimerTick()。
HOOK为TCP/IP协议栈设置保存的空间。OS库的任务调度模块需要调用hook来保存和调用TCP/IP协议栈的环境变量指针,这两个hook函数是NDK_hookInit() 和 NDK_hookCreate()。
(2)包含文件和库文件
请注意编译时需要包含库文件和文件路径,一般默认为c:/ti/c6000/ndk/inc
(3)CCS工程编译时的链接顺序
CCS一般按照特定的顺序来链接目标函数和库文件,NDK是对这个链接顺序很敏感的,错误的顺序可以导致重复定义符号甚至不正确执行等错误。为避免这个情况,可以在CCS里选择Link Order"-> "build options对话框,将文件按照一定顺序添加并且将库文件添置到连接顺序的最后,推荐的顺序为:NETCTRL.LIB,HAL_xxx.LIB,NETTOOL.LIB,STACK.LIB和OS.LIB。
在初始化启动协议栈之前,要为其分配一块工作内存(SDRAM),命令是_mmBulkAllocSeg( EXTERN1 )。还要调用fdOpenSession()来初始化文件指针向量表,否则创建socket的时候将出现错误。
我们将发送/接收设置定义为一个任务,在创建任务句柄以前,我们应该用NC_SystemOpen()打开网络功能并进行设置,在系统关闭前也要进行相应的处理。
使用NDK提供的socket API函数需要注意下面一些问题:(1)NDK中对socket API 通过一个文件指针接口与操作系统相连接,因此要调用文件指针向量表初始化和关闭函数对文件系统进行相应操作。(2)NDK中并没有提供windows API中强大的select函数,但是可以用fdselect实现一些相应的工程。可以相互对应得API函数还有NDK中的fdclose 和标准的close, NDK中的fderror和标准的errno.(3) NDK提供了很多网络工具的支持的函数,比如和DNS相关的一些函数,可以代替标准API中的getpeername, gethostname等。另外还有关于IGMP的一些函数可以用来支持组播,但是只支持作为组播用户,不能支持作为组播服务器。
3.NDK传输UDP数据包效率测试及性能分析
3.1测试平台结构
我们研究了在NDK下CPU对UDP数据包发送接收的效率,这个测试分成两部分:一部分是测试从DM642向PC机发送UDP数据包时,在不同的传输速率和不同的L2 cache大小时的CPU占用率,另一部分是测试DM642接收从PC机发送来数据包时,在不同的传输速率和不同的L2 cache大小时的CPU占用率。我们所使用的工具是在CCS下的NDK提供的socket API函数和在visual studio下提供的winsocket API。图4是测试环境的示意图。
图4:NDK测试环境示意图
3.2 测试平台的配置和实现
由于接收和发送程序十分相似,我们仅以发送程序举例。创建发送数据的程序为一个任务,在DSP/BIOS中,任务对象就是被TSK模块管理的线程。TSK模块根据任务的优先级和当前的执行状态动态的调度。DSP/BIOS总共有15个任务优先级可以使用,并且提供了一组函数来操纵任务对象,包括建立、删除、设置任务对象。任何任务对象都处于下面几种状态之一:运行态,就绪态,阻塞态,终止态。
在这个工程中,我们在网络控制的程序中进行任务的创建,图5是创建任务的流程图:
图5:传输任务创建流程图
其中创建任务的语句为:TaskCreate( tsk_udp, "udp_video", 5, 0x1000, peer_addr , 12345, 12345 )。理论上,可以通过设置两个task的方法来增加数据传输的速率,但是注意这两个task应该用不同的端口进行传送。任务调度的应用程序为:
static void tsk_udp( IPN IPAddr, int PeerPort , int LocalPort)
{ ……
// 创建 socket
s = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
……
// 设置要绑定的地址端口属性
bzero( &sin1, sizeof(struct sockaddr_in) );
sin1.sin_family = AF_INET;
sin1.sin_len = sizeof( sin1 );
sin1.sin_port = htons(LocalPort);
//绑定IP地址和端口
if( bind( s, (PSA) &sin1, sizeof(sin1) ) < 0 )
{ goto exit_tsk;}
//设定目的地址端口属性
bzero( &sin1, sizeof(struct sockaddr_in) );
sin1.sin_family = AF_INET;
sin1.sin_len = sizeof( sin1 );
sin1.sin_addr.s_addr = IPAddr;
sin1.sin_port = htons(PeerPort);
……
// 分配工作缓冲区
if( !(pBuf = mmBulkAlloc( 1024 )) )
{goto exit_tsk;}
// 一下开始发送数据
for(;;)
{ // 填充发送数据的缓冲区
*(int*)pBuf=send_udp_count++
// 发送数据
if( sendto( s, pBuf, 1000, 0, &sin1, sizeof(sin1) ) < 0 )
{ goto exit_tsk;//break;}
// 清空数据区
mmZeroInit( pBuf, (uint)test );
//设置发送数据率
TaskSleep(8); // 1Mbit/s
}
……
}
测试里面有两个关键参数需要设置,一个是发送(接收)的数据率和DM642内部第二级缓存的大小。收发的数据率可以通过改变任务挂起的时间间隔长度来改变。系统函数TaskSleep(n)表示每隔n毫秒执行一次发送,我们设定每次发送1000 Byte的数据,这样TaskSleep(8)表示1Mbit/s的传输速率,TaskSleep(4)表示2Mbit/s的传输速率,以此类推。
而L2 cache大小的改变可以通过以下语句来设置:
CACHE_setL2Mode(CACHE_64KCACHE)表示设置了64K L2 Cache;CACHE_setL2Mode(CACHE_128KCACHE)表示设置了128K L2 Cache,以此类推。
3.3 测试结果和性能分析
我们在DM642评估版上,采用标准的recvfrom函数进行数据接收,以无连接的UDP协议与windows PC进行相互传输。对不同传输速率和不同大小的二级缓存下CPU的占用率进行了比较。
CPU占用率=空闲周期可完成的低优先级任务/ 执行传输任务时可完成的低优先级任务
其中接收和发送数据均设为每次1000 Byte,评测结果在下面四个图表里面显示出来。
传输速率(Mbit/s)
64KCache
128KCache
256KCache
0.4
0.29
0.21
0.19
0.8
0.58
0.45
0.38
2
1.1
1.02
0.96
4
2.64
2.26
1.88
8
5.11
4.38
3.64
16
9.86
8.25
6.89
表1:由DM642发送UDP数据包的CPU占用率(%)
图6:DM642发送UDP数据包CPU占用率比较图
传输速率(Mbit/s)
64KCache
128KCache
256KCache
0.4
0.2
0.13
0.14
0.8
0.35
0.28
0.27
2
0.82
0.7
0.67
4
1.62
1.34
1.34
8
3.65
2.69
2.68
表2:由DM642 UDP数据包的CPU占用率(%)
图7:DM642接收UDP数据包CPU占用率比较图
从上面的比较可以看出 DM642发送和接收数据包时的CPU占用率均随着网络传输速率的增加而提高,而且基本上呈线性关系。因为收发数据是对数据简单的搬移,它的复杂度是随着数据的增加而线性增长的,在高速缓存一定得情况下CPU的占用率线性增加。
而第二级缓存的大小对CPU的占用率也有影响,一般而言是L2 cache越大,CPU占用率越小,而且随着收发数据率的变大而显得更加明显,这个得益于DM642两级缓存的工作原理和强大的DMA功能。
L2 cache增大带来的另一个影响是CPU片内存储容量的减少,使得片内能放下的代码段和数据段就比较少,这样反而会减缓程序的运行速度,这在处理复杂的编解码程序,数据段和代码段比较多时尤为明显,这就需要程序员根据实际情况统筹安排合理配置。
TCP/IP Stack – UDP
Beta-song @ 2008-8-24
转载自:
http://www.diybl.com/course/6_system/linux/Linuxjs/2008829/138684.html
本文从流程上简要分析了UDP数据发送和接收数据包的过程,并没有深入协议细节。
UDP协议入口
net/ipv4/f_inet.c, UDP操作集
const struct proto_ops inet_dgram_ops = {
.family = PF_INET,
.owner = THIS_MODULE,
.release = inet_release,
.bind = inet_bind,
.connect = inet_dgram_connect,
.socketpair = sock_no_socketpair,
.accept = sock_no_accept,
.getname = inet_getname,
.poll = udp_poll,
.ioctl = inet_ioctl,
.listen = sock_no_listen,
.shutdown = inet_shutdown,
.setsockopt = sock_common_setsockopt,
.getsockopt = sock_common_getsockopt,
.sendmsg = inet_sendmsg,
.recvmsg = sock_common_recvmsg,
.mmap = sock_no_mmap,
.sendpage = inet_sendpage,
};
上述ops函数最终会对应到udp协议上的相关函数,比如inet_sendmsg在内部是通过如下调用实现的:sk->sk_prot->sendmsg(iocb, sk, msg, size),其中, struct proto *skc_prot指向的是如下的udp_prot。
net/ipv4/udp.c,UDP协议
struct proto udp_prot = {
.name = "UDP",
.owner = THIS_MODULE,
.close = udp_lib_close,
.connect = ip4_datagram_connect,
.disconnect = udp_disconnect,
.ioctl = udp_ioctl,
.destroy = udp_destroy_sock,
.setsockopt = udp_setsockopt,
.getsockopt = udp_getsockopt,
.sendmsg = udp_sendmsg,
.recvmsg = udp_recvmsg,
.sendpage = udp_sendpage,
.backlog_rcv = udp_queue_rcv_skb,
.hash = udp_lib_hash,
.unhash = udp_lib_unhash,
.get_port = udp_v4_get_port,
.obj_size = sizeof(struct udp_sock),
};
发送过程对应函数udp_sendmsg,接收函数对应udp_recvmsg,接下来进行分别分析。
发送过程
int udp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg, size_t len){
//第一步:检查是否有pending的帧,有的话则跳转到第四步进行发送
//第二步:获取路由,如果不存则通过ip_route_output_flow新建一个
rt = (struct rtable*)sk_dst_check(sk, 0); …………
//第三步:堵塞udp_sock来发送数据,防止当前包未发送完,又有新数据要发送
up->pending = AF_INET;…………
//第四步:收集数据并发送(数据有可能在iovec向量中,所以有必要先收集到一起)
// make one large IP datagram from many pieces of data.
//Each pieces will be holded on the socket until ip_push_pending_frames() is called
// ip_append_data 函数内部调用__skb_queue_tail(&sk->sk_write_queue, skb);将数据包排队在写队列上
err = ip_append_data(sk, getfrag, msg->msg_iov, ulen,
sizeof(struct udphdr), &ipc, rt,
corkreq ? msg->msg_flags|MSG_MORE : msg->msg_flags);
if (err)
udp_flush_pending_frames(sk); //出错了,扔掉所有当前包的数据,并关闭pending
else if (!corkreq)
err = udp_push_pending_frames(sk); //调用ip_push_pending_frames发送数据
else if (unlikely(skb_queue_empty(&sk->sk_write_queue)))
up->pending = 0; //取消pending,以便新数据可以进行发送
release_sock(sk);
}
int ip_push_pending_frames(struct sock *sk){
//第一步:从写队列中收集所有的数据
while ((tmp_skb = __skb_dequeue(&sk->sk_write_queue)) != NULL) {……}
//第二步:发送
err = NF_HOOK(PF_INET, NF_IP_LOCAL_OUT, skb, NULL,
skb->dst->dev, dst_output); //dst_output将数据从传输层送至网络层
}
static inline int dst_output(struct sk_buff *skb){
return skb->dst->output(skb); //本路由表项的发送函数是网络层的ip_output,
//继续调用ip_finish_output……
}
接收过程
Net/ipv4/udp.c
int udp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
size_t len, int noblock, int flags, int *addr_len){
skb = skb_recv_datagram(sk, flags, noblock, &err); //有数据就返回之,反之则阻塞
err = skb_copy_datagram_iovec(skb, sizeof(struct udphdr),
msg->msg_iov, copied); //拷贝数据到用户空间
}
struct sk_buff *skb_recv_datagram(struct sock *sk, unsigned flags, int noblock, int *err){
do {
skb = skb_dequeue(&sk->sk_receive_queue); //从sock接收队列中取出数据包
} while (!wait_for_packet(sk, err, &timeo)); //必要时会阻塞
}
static int wait_for_packet(struct sock *sk, int *err, long *timeo_p){
DEFINE_WAIT(wait); //当前进程加入等待队列
prepare_to_wait_exclusive(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
if (!skb_queue_empty(&sk->sk_receive_queue))
goto out; //不用等待,因为有数据
*timeo_p = schedule_timeout(*timeo_p); //本进程休眠
out:
finish_wait(sk->sk_sleep, &wait); //将本进程从等待队列删除,结束等待
return error;
}
关于timeo_p
关于休眠时间的问题,是这样设置的:noblock ? 0 : sk->sk_rcvtimeo;
如果非阻塞调用,休眠时间是0,就是不阻塞;否则阻塞sk->sk_rcvtimeo长的时间。
在sock_init_data中对该时间设置为MAX_SCHEDULE_TIMEOUT,而该值定义为LONG_MAX,即永不超时。
当没有数据可读的时候,udp进程就会阻塞等待;以下是net core接收数据流程,并唤醒udp进程的过程。
Net/ipv4/ip_input.c
static inline int ip_local_deliver_finish(struct sk_buff *skb){
if ((ipprot = rcu_dereference(inet_protos[hash])) != NULL) {} //定位到具体的传输层协议
ret = ipprot->handler(skb); //调用其handler,由以下分析可知,该handler是udp_rcv
}
Net/ipv4/af_inet.c
static int __init inet_init(void){
(void)sock_register(&inet_family_ops);
if (inet_add_protocol(&udp_protocol, IPPROTO_UDP) < 0){}
}
static struct net_protocol udp_protocol = {
.handler = udp_rcv,
.err_handler = udp_err,
.no_policy = 1,
};
Net/ipv4/udp.c
int udp_rcv(struct sk_buff *skb)
à __udp4_lib_rcv(skb, udp_hash, IPPROTO_UDP);
à udp_queue_rcv_skb(sk, skb);
à sock_queue_rcv_skb(sk,skb);
à skb_queue_tail(&sk->sk_receive_queue, skb); //放到sock的接收队列
à sk->sk_data_ready(sk, skb_len); //向上通知数据准备好了
针对inet协议簇,sk_data_ready就是sock_def_readable,见如下分析:
static struct net_proto_family inet_family_ops = {
.family = PF_INET,
.create = inet_create, //调用sock_init_data(sock, sk);
.owner = THIS_MODULE,
};
void sock_init_data(struct socket *sock, struct sock *sk){
sk->sk_data_ready = sock_def_readable;
sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
}
static void sock_def_readable(struct sock *sk, int len){
read_lock(&sk->sk_callback_lock);
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
wake_up_interruptible(sk->sk_sleep);
sk_wake_async(sk,1,POLL_IN);
read_unlock(&sk->sk_callback_lock);
}
可见,sock_def_readable唤醒等待在该socket上的udp进程,允许其继续读取数据。