2017大国工匠视频:在Vovida的基础上实现自己的SIP协议栈

来源:百度文库 编辑:偶看新闻 时间:2024/04/29 11:44:21

在Vovida的基础上实现自己的SIP协议栈(一)

卢政 2003/08/01
写在前面的话

  不少通讯方面的同好已经读了我在去年岁末撰写的《如何用OpenH323开发自己的H.323协议栈》,大都给予了很高的评价,甚至可以说是好评如潮,说来惭愧,我只不过把十几个人的工作进行了整理和归纳而已,事实上我自己的代码只有很少的一部分(主要在H.245/H.235部分),后来很多朋友向我索要RTH323的测试版本一直未果,我在这里说明一下,由于该软件的使用和二次开发的权利已经被某欧洲公司所买断,所以我已经无权发布测试代码,如有不便敬请大家原谅.

  在我们开发RTH323之际,我已经开始注意SIP协议了,并且根据RFC2543设计了不少实验代码,因为当时的开发有一个H.323-SIP的翻译网关的需求,不过后来这个计划又取消了,也从这个时候开始我逐渐对SIP有了比较浓厚的兴趣,只是并没有做什么实际的工作,仅仅初步了解了一下协议的整体构造.

  直到去年年底,我接触了Vovida的开放原码的SIP系统--Vocal以后,我决定开始系统的了解SIP的整个构造,我个人认为Vocal是一个非常典型的SIP系统,里面包含了所有构造电信级呼叫中心,区域网关以及中继网关的所有内容,而且代码清晰,比较易于改造;于是我花费了大概6个月的时间阅读了整个Vocal系统的代码,并在很多重要的地方做了标注.正好在今年的5月份,我的公司有构造一个大型的Voice/Video IP企业呼叫中心的计划,我就把我对Vocal的研究成果提交给公司,方案得到通过,现在我的公司正在和国内某个知名大学合作准备在现有Vocal基础上改造一个企业级的视频电话呼叫平台

  不过我个人而言,对这个计划不是非常的满意,由于时间和金钱上的限制,大部分的电讯级补充服务第一阶段没有实现的,只可能在第二阶段去实现该计划,时间可能要持续很长,所以我也很希望有其他的开发公司参与完成这个方面的开发,当然我们也可以为有兴趣的公司提供技术咨询或者展开合作。

  我写这本文的目的在于公布我个人对Vocal这个开放原代码系统的一些研究成果,当然里面有很多地方没有说得非常清楚,本身要用文字来阐述程序的设计思想就是一个非常困难的事情,所以我在里面大量的绘制了很多图表,来帮助读者阅读本文,这次公布的是有关UA端的内容,后续在本文中将介绍Vocal系统中的Provision Server; Marshal Server; Redirect Server; HeartBeat Server:Policy Server:CDR Server:Network Manager:Feature Server:以及各种协议的Translator,读者需要具备对SIP,H.323,MGCP,QoS的基本了解,以及对Java,XML, Call Processing Language,C++的知识,在以下章节中不会对这些基本的知识做太详细的介绍。

  预计本文全部刊登完毕大概需近一年的时间,我希望有软件开发公司来和我合作完成这篇文章或者是共同从事Vocal系统的二次开发工作。

  从这半年的阅读Vocal的过程中我体会到Vocal系统的商业应用价值非常大,有人在论坛上和我讨论:改造别人的应用平台的难度和重新开发一个应用的难度哪个更大,双方都各置一词,就我个人觉得,大型的商业软件如果按照开放原代码来进行改造,特别是一些基础平台(例如操作系统),速度肯定是比从零开始要快很多的,RTH323就是一个成功的范例,这也是国内很多软件厂商的基本运行模式,特别对于一些人员,技术,资金都不是非常充裕的公司这可能也是唯一的方案,但是开放原代码的软件大部分的效率非常低下,而且代码冗余,注释比较少,可读性非常差,甚至还有一些致命错误(Vocal中的Feature Server中就有这样的错误,往往可以造成系统崩溃)所以这样做的前提条件就是能把握住工作的重点,能及时发现并排除问题,这样才可能把开放原代码改造成高效率的商业应用。

目 录

一.楔子
二.H.323和SIP之间的差异
三.本文的主要内容
1.User Agent的简介
2.UA部分主要程序部分的介绍
2.1 主程序:\SIP\UA\UA.cxx
2.2 创建一个User Agent的实体
2. 3 HeartLessProxy的创建
2.4 让User Agent Run起来
2. 5 HeartLessProxy Run方法的实现
2. 5. 1 WorkerThread的Run方法
2.5.1.1 processSipEvent
2. 5. 1. 2 processUaDeviceEvent
2.5.1.3 processUaDigitEvent
2. 5. 2 SipThread的Run方法
2. 6 在User Agent中的四个重要实例的Run方法
2. 6. 1 媒体设备启动
2.6.2 启动RTP线程,用于对RTP/RTCP包的接收和发送管理;
2. 6. 3 合法用户列表的获取(Redirection Server专用)
2. 6. 4 监测线程:
2. 6. 5 自动呼叫
3.开始一个呼叫和等待对方呼叫:
3. 1 系统创建StateIdle状态:
3. 2 开始一个呼叫:
3. 2. 1 OpStartCall主程序部分:
3. 2. 2 取得键盘的事件
3. 2. 3 状态机(State)对各个操作(Operator)的处理过程:
3. 2. 4 开始一个呼叫所经历的各种操作(Operator)
3. 2. 5 如何进入待机状态(Idle状态)
3. 2. 6 如何开始拨号并且开始一个呼叫:
3. 2. 6. 1 OpStartDialTone本地发送拨号音;
3. 2. 6. 2 OpAddDigit输入电话号码开始拨号:
3. 2. 6. 3 OpStopDialTone;
3. 2. 6. 4 OpInviteUrl建立一个INVITE消息并且发送到被叫;
3.2.7 进入Trying状态
3. 2. 7. 1 OpStartTimer启动每个事件的定时器:
3. 2. 7. 2 挂机事件的检测机制
3. 2. 7. 3 OpStartRingbackTone向被叫进行铃声回放。
3.2.7.4 OpReDirect进行重定向服务的操作
3.2.7.5 授权检查
3.2.7.6 OpFarEndAnswered处理接收到的OK回应
3.2.7.7 在Vocal中如何实现RSVP资源预留协议
3.2.8用户处于通话的StateInCall状态:
3. 2. 8. 1 OpStartAudioDuplex主叫打开RTP通道
3. 2. 8. 2 处理RTP/RTCP包:
3.2.8.3 ACK消息的处理过程OpAck
3. 2. 8. 4 OpConfTargetOk多方会议检测:
3.2.9 呼叫等待
3. 2. 9. 1 呼叫等待的详细描述:
3. 2. 9. 2 操作之间存在的竞争
3. 2. 9. 3 呼叫中所涉及模块介绍
3.3 等待对方的呼叫
3.3.1 OpRing等待对方的振铃消息
3. 3. 2 OpStartRinging开始响铃
3. 3. 3 OpRingingInvite处理又一个INVITE消息(呼叫等待)
3. 3. 4 OpAnswerCall被叫打开媒体通道开始通讯
3.3.5 回到StateInCall状态
4.如何在改造现有的终端使之能传递视频流。
4.1一个H.261+的Codec的基本构造
4. 2 增加视频能力所需要做的工作

一.引言

  在各种的IP网络多媒体通讯协议中,当前在市场上占据主流位置的应当算是ITU的H.323和IETF的SIP两个协议,目前在单纯的话音市场,MGCP协议由于有大规模的用户扩容能力应用也正呈现上升的趋势,在2000年以前市场上占主流的主要是H.323协议,然而SIP协议由于它避免了复杂的原语(ASN.1)分析,它的应用也在2000年以后也得到高速的普及,甚至有超过H.323的趋势,成为H.323最有力的竞争对手,当然,由于SIP协议的一些固有缺陷(下面将会详细介绍这些缺陷),这种情况在未来的几年可能不会出现,不过对于中等规模的多媒体通讯业务(每小时接入60000门)应用而言,采用SIP不失为一个方便,快捷的开发策略。

  在各种的VOIP开放原码的开发项目中,Vovida的基于SIP协议的VoCAL(Vovida Open CommunucAtion Library)不仅仅是在基于SIP的开放原代码协议栈中是最为庞大而且完善的,甚至在所有的原码开放的多媒体通讯协议栈中同样也是完善而且全面的,目前发布的VOCAL1.4.0主要支持RFC2543,据称在新版本的Vocal1.5.0将支持RFC3261协议;Vocal提供了基本的SIP呼叫控制和切换,例如:用户注册和登记,呼叫初始化,修改呼叫特性,或者重新定义呼叫特性,终止呼叫;以及一些用户的基本呼叫特性:例如呼叫前转,呼叫等待,呼叫阻塞,呼叫转移,语音邮件等等。

  对于一个Vocal系统的用户而言,Vocal同样为其提供了以下的一些能力:

1. 通过Web来配置整个Vocal系统;
2. 使用SNMP网管来检测整个系统和呼叫组网的状态;
3. 可以定义一个用户的呼叫特性列表(相当于H.323系列中的H.450补充协议部分);
4. 授权检查;
5. 广告信息;
6. 基于RSVP的简单QoS保证。

  同时,VOCAL也提供了详细的文档和SDK包以给用户做二次开发,用户可以在C++,以及Call Processing Language(CPL),Java Telephony API上开发自己的应用。

二.H.323和SIP之间的差异:

  这一节和本文的内容似乎没有太大的关系,不过笔者认为作为市场主流的H.323和SIP之间需要做一个相关性的比较,以免使很多读者在选择协议的时候陷入歧途。

  虽然Vocal在SIP的应用上已经可以算是一个成功的实例,所以目前单纯以Vocal作为主体开发SIP的多媒体通讯系统从理论上是可行的,但是事实上目前所有的VOIP商业系统都是以H.323为主体,兼容SIP协议,似乎还没有一个厂商在实际上支持SIP(Cisco好象有类似的产品,不过应用前景似乎不是非常明朗),首先从市场上来看,H.323的系统已经有大量的投资,应用也非常普遍,SIP相对比较新,似乎不够成熟;从市场上来看,越来越多的附加服务将成为应用的主流,SIP领域相对来说比H.323能够提供更多,更灵活的服务,而且在信令的互通性上有更加多的优势,当然H.323也能够保证其他解决方案之间的互用性;但是,目前MGCP协议已经得到了大量的工业支持,简单的终端和更加复杂完善的呼叫控制方式让它得到了更多的应用,很可能会成为SIP的潜在竞争者。

  其次我们从ITU和IETF的条款保证上来看,IETF所制定的草案一开始都阐述一个让人失望的观点:本草案作为参考资料是不合适的,除非在"制定和完善中",这样在协议成熟和完全理解期间必然会把一些工作引入误区,特别是某些协议被更新和升级期间。相对而言,作为官方实体的ITU制定的协议一旦实施就不再轻易的改变,因此在发布协议前,已经对协议作了很长时间的互通性测试。

  最后从技术上来看,SIP和H.323在技术实现上有很大的不同:

  a.开发速度:SIP当然的优于H.323协议太简单了,不过如果H.323原语部分可以比较好的解析的话,事实上两者开发速度相差不大。

  b.多播:在这个方面IETF具有优势,有非常强大的应用经验的,SIP已经设计在很多多播的骨干网络上,h.323v1,v2要使用多单播同时进行的方式才能完成,不过H.323V3版本多播的支持就已经非常不错了。

  c.地址的运用上SIP使用Url上的机制非常灵活,这样可以让SIP以一种非常灵活的方式重定向到非SIP服务器上去,被另外一个SIP呼叫的SIP终端也能重定向到某个网页或者是电子邮件地址。对于H.323而言,命名的机制就非常混乱了,从ASN.1的文件我们可以看到有h323-ID,url-ID,transport-ID,email-ID,partynumber等等。

  d.对于SIP而言,所有的消息都采用文本编码,所以SIP消息非常简单,这样在开发的时候简单的网络检测就可以调试,反观H.323协议采用了PER或者BER的二进制编码方式,信令不是非常直观。

  e.系统资源的消耗上,SIP可以说是开销惊人,每次服务器发出通告的时候,都需要建立一个监听套接字,这样的结果势必造成大量的闲置套接字,假设在建立一个完整的Proxy/Register/RTP Gateway/三者和而为一的园区出口网关的时候,资源上势必会非常的紧张,这个是不能不予以考虑的问题。相反H.323在打开逻辑通道的情况下(OpenLogicalChannel消息)只建立一个套接字。

f. SIP没有会议控制能力,所以仅仅只能做到点对点的媒体通讯,而H.323一开始就考虑了会议功能,其中还包含了H.332会议控制协议。(Vocal提供了一个Conferencing Server可以做普通的会议控制)。

g. 基于无线的网络而言,H.323有很大优势,由于信令采用了二进制编码,所以比较适合手持设备实现,而SIP由于采用了文本方式就没有这样的能力。

三.本文的主要内容

  和RTH323的介绍一样,我在下面将会尽量详细的分析一下Vocal的整个原代码,当然不可能做到完全系统地向大家展示Vocal的精妙之处,其实我自己对这个协议栈还是有相当多不了解的地方,希望大家能对我们的研究提出宝贵意见,后续的文章将会以连载的方式对Vocal进行介绍,顺序如下:

3.1 Vovida User Agent:
3.2 Vovida Provision Server:
3.3 Marshal Server;
3.4 Redirect Server;
3.5 HeartBeat Server:
3.6 Policy Server:
3.7 CDR Server:
3.8 Network Manager:
3.9 Feature Server:
3.10 Translator Server:

  我们现在可以开始进入Vovida的第一个实体的介绍--User Agent


(点击放大)

User Agent

1.User Agent的简介:
  User Agent是描述一个普通的用户终端,用户代理,以下都简称UA端。本身来说UA端的代码在Linux或者是Windows上都可以编译运行。在Vocal中资料最详细是User Agent的介绍了,有关UA描述的所有的代码部分部分集中在\SIP\UA目录下面,SIP的Stack软件主要集中在\SIP\SIPSTACK,SIP消息和状态的基类描述主要集中在\SIP\BASE;大家如果对SIP的状态和命令不是非常熟悉的话,可以进入\SIP\UA\目录下浏览以下的几个线图:

1. UaOverView.gif:
  对于UA中全部的主要类的关系描述,主要是展现了一些比较重要的基类。
2. UaSimpleStatesComplet.gif
  UA端的一个简单的呼叫和应答的全部命令和状态的交互示意图。
3. Ua-States.gif
  UA状态迁移的示意图。
  另外在UA中我们会把基本的SIPStack的一些调用做一下详细的介绍,所以篇幅可能会比较长。

下面是一些所使用到的SIP基本的类的介绍:
  HeartlessProxy :创建了一个容纳呼叫的"容器",和SIP的消息堆栈,以及WorkThread和SipThread(用于对SIP消息的队列处理),由HeartlessProxy::Run()方法调用这写线程的Run方法,使他们启动。该类的初始化是用一个Builder的基本类对它进行实例化。

  BasicProxy:由源自HeartlessProxy,它让系统使用HeartBeat机制,在这个类中创建了三个HeartBeat类型的线程:HeartbeatTxThread,HeartbeatRxThread,HouseKeepingThread,不过暂时在Ua中都没有应用到,一般是用在HeartBeat Server中(注:HeartBeat的机制就是指在Vocal的Server集群通过多播端口向HeartBeat Server发送HeartBeat数据报,如果在指定的时间内没有收到该数据报,那么认为该服务器处于Down状态,由HeartbeatServer发送状态消息到SNMP网管,同时启动备份设备,这个机制类似于BGP,EIGRP协议中的后备路由方案)。

  SipThread:SipThread源自ThreadIf(Thread Interface),主要作用是接收并且在sipstack对所收到的SIP消息排队,并且对接收的SipMsg(SIP消息)产生相应的Sip本地处理事件SipEvent,并且把他们放置在一个Fifo队列中等待处理,SipThread::thread()是循环处理的线程。

  WorkerThread:和SipThread一样,它也是源自ThreadIf,主要作用在于接受并且列队处理上面SipThread收到的Sip本地处理事件SipEvent。

  Builder:是一个基本类,它由WorkerThread调用,在这个类中包含了针对用户代理的CallContainer(包含所收到的各种呼叫信息)类的指针,从代码上看Builder在HeartLessProxy/BasicProxy被创建的时候创建。

  Feature: Feature是一个状态(State)容器,用于装载各种状态,从所有的状态上来说,Feature是所有State的集合,Feature,State,Operator之间的关系是一种容器包含的关系,在Feature::process()中会调用State::process()来完成各个状态的处理,它会返回下一个在容器内要处理的状态(Sate)。.

  State: State是Operator的集合容器, State::process() 调用 Operator::process().和上面所描述的一样Operator返回在容器内的下一个操作(Operator)。

  Operator : Operator则是一个基本类,Operator::process() 是一个虚函数,他需要其他的Operator子类对它进行实例化,它实质上也是描述各种操作的一个基本类。

  SipProxyEvent: 这个是一个基本类,用于描述各种SIP的事件信息,包括各种SIP消息和各种本地的设备消息,同时它包含了SIP消息的输出队列指针,使用的时候,可以把他载入输出的FIFO中。

  SipEvent: 从SipProxyEvent上继承,用于描述各种SIP消息,使用中当SipThread收到一个SipMsg的时候创建一个SipEvent,同样SipEvent也会安装在输出的FIFO中(Outputfifo)。

  DeviceEvnet: 从SipProxyEvent上继承,用于描述本地设备事件。

  TimerEvent:从SipProxyEvent上继承,在设定的时钟超时的时候产生该事件。

  CallContainer: 是CallInfo类的容器类。

  CallInfo: 是一个基本类,用于对呼叫的各种信息的描述集合,任何一个SipProxyEvent都包含了一个CallInfo。

  CallProcessingQueue: 一个用于装载各种SipProxyEvent消息的FiFO队列,在构造HeartLessProxy的它被创建,WorkerThread对他中间的消息进行排队处理。

  FeatureThread:被Marshal Server调用用于发送接受subscribe/Notify消息对,得到合法的用户的列表和呼叫特性(在后面介绍marshal和Feature Server的时候会详细介绍)。

  ResGwDeviece: 所有设备的基本类,用于描述所有的设备,当然它中间的很多属性需要具体的设备进行实例化。


(点击放大)

下面介绍一下在UA端所使用到的基本类:

  UserAgent: 用于描述一个基本的用户代理,它通过Run方法启动以后打开了用户端的RTP通道和设备媒体设备处理进程,并且启动了HeartlessProxy的Run方法,开始启动SIP消息处理线程。

  DeviceThread: 用于处理各种媒体设备,以及输入输出设备的线程,将收到的设备消息放在CallProcessingQueue队列中。

  RTPThread: 用于处理RTP/RTCP会话。

  SubScribemanager: 用于MS端发送subscribe消息到RS端,以及接收Notify消息,处理用户的呼叫特性列表。

  UaCallContainer: 继承CallContainer类,主要在Ua端使用,定义了UA呼叫的各种信息的集合。

  SipTransceiver: SIP消息的发送和接收器的描述,包含有一个接收缓冲队列和发送缓冲队列。

  UaBuilder: Builder类在Ua端的描述,继承了Builder的各种描述,这个是UA端的一个重要的类,它负责构建各种SIP消息事件,并且在这里包含了UA的注册和各种状态机的初始化和实例化过程。

  UaConfiguration: CFG文件的描述类。

  UaCallInfo:属于CallInfo的子类,包含了UA的各种工作状态,可以让不同状态下的所有所有的Call操作(发送和接收),使用一个相同的状态机。

  RegisterManager:用于跟踪处理UA端的注册。

  LoadGenThread:检测线程,用于大量的呼叫时候对系统的检测.


(点击放大)

  其实这几个线图对UA的描述还是非常粗糙,如果大家对UA端的代码没有任何的阅读的话,看他们是完全看不懂的,这些只能是一些内部开发人员专用文档而已,下面我们开始对UA原代码部分做详细的介绍:

2.UA部分主要程序部分的介绍:

2.1 主程序:\SIP\UA\UA.cxx
  主程序部分主要是根据CFG文件中的定义建立本地的呼叫和等待接收进程。

main( int argc, char* argv[] )
{
… …
//是否把当前的UA设置为守护进程。由CFG文件确定。
if ( UaCommandLine::instance()->getBoolOpt( "daemon" ) )
{
// TODO set cpLog to use syslog
assert( Daemon() >= 0 );
}
… …
if (UaCommandLine::instance( ) -> getIntOpt( "retransmit" ))
{
SipTransceiver::reTransOn();
}
else
{
SipTransceiver::reTransOff();
}

if (UaCommandLine::instance()->getIntOpt("retransinitial") != 500 ||
UaCommandLine::instance()->getIntOpt("retransmax") != 4000 )
{
SipTransceiver::setRetransTime(
UaCommandLine::instance()->getIntOpt("retransinitial"),
UaCommandLine::instance()->getIntOpt("retransmax")
);
}
//这里是打开配置文件,我们在这里使用的配置文件暂时定为Ua1001.cfg(以下均相同)
const string cfgStr = UaCommandLine::instance()->getStringOpt( "cfgfile" );
FILE *cfgFile = fopen( cfgStr.c_str(), "r");
if ( cfgFile == 0 )
{
cerr << "can not open " << cfgStr << endl;
cerr << "Usage: " << argv[0] << " " << appUsage << endl;
exit( 0 );
}
else
{
fclose( cfgFile );
UaConfiguration::instance( cfgStr );
}
// if the config file has a log level, do something about it
if(UaConfiguration::instance()->getLogFilename() != "")
{
int retval = cpLogOpen(
UaConfiguration::instance()->getLogFilename().c_str());
if(retval == 0)
{
cpLog(LOG_ALERT, "Could not open %s",
UaConfiguration::instance()->getLogFilename().c_str());
}
}
//创建一个Uabuilder类,它在类状态图中的位置可以参考Uaoverview.gif中它的位置,
Sptr < UaBuilder > uaBuilder = new UaBuilder;
//在这里创建一个用户代理,我们要确定它在本地的侦听和发送的端口,我们同样从CFG文
//件中得到。
UserAgent ua( uaBuilder, Data( UaConfiguration::instance()->getLocalSipPort() ).convertInt() );
ua.run();
if ( UaCommandLine::instance()->getBoolOpt( "voicemail" ) )
{
//下面的两个Run我们暂时不定义,在SNMP网管的时候将介绍HearterBeat的时候在做//详细的阐述(所谓的HeartBeat技术是在多播口上定时发送heartbeat消息,以通知目前//端点的状态,类似于BGP协议中的Hello消息。
#if defined(HAS_VOICEMAIL)
cpLog( LOG_DEBUG, "UA is running as voicemail front end" );
if( !UaCommandLine::instance()->getBoolOpt("no_heartbeat") )
{
#if defined(HAS_HEARTBEAT)
// Create and start heartbeat transmit thread
Sptr < HeartbeatTxThread > heartbeatThread
= new HeartbeatTxThread(sipPort,
500,
(const char*)"226.2.2.5",
6000);
heartbeatThread->run();
heartbeatThread->join();
#endif
}
#else
cpLog( LOG_ERR, "UA is NOT compiled to run as voicemail front end" );
#endif
}
//加入UA端到本地的运行队列里面
ua.join();
return 0;
} // ua main()

  看完了UA的主程序,我们可以看到,目前在UA端的部分的主要工作就是创建一个User Agent的实体UserAgent然后调用Run方法让它运行,那么我们看一下UserAgent这个关键类的一些基本情况:


(点击放大)

2.2 创建一个User Agent的实体:

  在创建一个User Agent的实体的同时还有一个非常重要的实体HeartLesProxy,用于处理SIP的各种消息,并且开启后台工作线程;它的创建过程我们稍后做详细介绍。

UserAgent::UserAgent( Sptr uaBuilder, unsigned short sipPort, Data appName )
: HeartLessProxy( uaBuilder, sipPort, appName )
{
const char* useDevice = "NULL"; // default to NULL_HARDWARE
… …
//用什么声音设备?在这里我们暂时定为Sound Card好了,如果要是用Quicknet来集成
//各种压缩算法当然更好,不过价格也高了一些,下面都以Sound Card作为标准介绍。
if( UaCommandLine::instance()->getBoolOpt( "soundcard" ) )
{
useDevice = "SOUNDCARD";
}
//LOAD GENERATION是一个检测线程,可以在屏幕上打印各种命令的往复消息,以及系统的各//种统的状态。
if(! UaConfiguration::instance()->getLoadGenOn())
{
// Create devices only if load gen is turned OFF.
//设备实例化,并且向带入设备名称以及所要处理的消息队列,在实例化的过程中会//打开一个声卡设备,并且将这个声卡设备绑定两个输入的命令,输出命令的FIFO
//队列当中,(inputQ,和outputQ)详细可以参看SoundCardDevice的建构函数,它//阐述了如何绑定这两个队列inputQ,outputQ,并且初始化ResGwDevice(所有的声音//设备的父类)。
UaDevice::instance( useDevice, myCallProcessingQueue );
if( ! (strcmp(useDevice, "NONE") == 0) )
{
cpLog( LOG_DEBUG, "Create RTP Thread" );
//创建一个RTP包处理线程用于对RTP Packet的处理
rtpThread = new RtpThread( UaDevice::instance() );
assert( rtpThread != 0 );
}
//这里调用了sound Card的设备消息处理线程的创立,用于处理与声卡设备相关的各种消//息.
deviceThread = new DeviceThread( UaDevice::instance() );
assert( deviceThread != 0 );

cpLog( LOG_DEBUG, "Create SubscribeManager" );
Sptr subManager = new SubscribeManager( mySipStack );

if ( UaConfiguration::instance()->getSubscribeOn() )
{
cpLog( LOG_DEBUG, "Create Feature Thread" );
//这里建立一个向FS发送消息的线程,关于这个部分的内容在Feature //Server的部分再做详细介绍.
featureThread = new FeatureThread( subManager );
assert( featureThread != 0 );
uaBuilder->setSubscribeManager( subManager );
}
}
else
{
… …
}

// 是否打开重传机制?
if (UaCommandLine::instance( ) -> getBoolOpt( "retransmit" ) )
{
SipTransceiver::reTransOn();
}
else
{
SipTransceiver::reTransOff();
}

// 定义接收代理服务器(Proxy)发出的消息所储存的容器
myCallContainer = new UaCallContainer;
assert( myCallContainer != 0 );
//绑定容器到用户端
uaBuilder->setCallContainer( myCallContainer );
//设置SIP的消息堆栈
uaBuilder->setSipStack( mySipStack );
//开始向注册服务器发送注册(Register)消息。
uaBuilder->startRegistration();
}
2.3 HeartLessProxy的创建:
HeartLessProxy
(
const Sptr < Builder > builder,
unsigned short defaultSipPort,
Data applName,
bool filterOn,
bool nat,
SipAppContext aContext
)
{
myCallContainer = new CallContainer;

myBuilder = builder;
myBuilder->setCallContainer(myCallContainer);
//这里创建了一个消息的输出队列,在前面的创建一个UserAgent的实体的过程中已经
//阐述过会把它绑定到相关的设备上去
myCallProcessingQueue = new Fifo < Sptr < SipProxyEvent > >;
//这里创建一个WorkThread线程在该线程中的myBuilder->process(nextEvent)
//检查消息队列myFifo中的返回消息(调用Uabuilder->process进行检查),从而
//得到返回的消息。
//很明显,这里新创建了一个 myWorkerThread工作线程,我们等一下就会看到如何把它Run
//起来
myWorkerThread = new WorkerThread(myCallProcessingQueue, myBuilder);

//创建一个SIP消息收发器的实体,在这个实体的构建里主要是把收发SIP消息的TCP/UDP
//的收发通道创建(SipUdpConnection和SipUdpConnection)。同时会构造一个SNMP的
//SipAgent.他的主要作用是向SNMP网关发送SNMP消息,描述网络的运行状态
if ( filterOn == true )
{
mySipStack = new SipTransceiverFilter(applName, defaultSipPort, nat, aContext);
}
else
{
mySipStack = new SipTransceiver(applName, defaultSipPort, nat, aContext);
}
myBuilder->setSipStack(mySipStack);
//创建一个SIP消息的解析线程 。
mySipThread = new SipThread(mySipStack, myCallProcessingQueue);

… …
}

2.4 让User Agent Run起来:

  构建User Agent的工作已经完毕,现在应该让调用它的Run方法了;从下面的程序可以看到,Run方法的调用,让整个程序进入一种"Idle"的状态,等待命令输入和状态的产生,这个过程我们可以看到在Ua.CXX的Main程序中调用(ua.run())。

Void UserAgent::run()
{
//调用HeartLessProxy的Run方法,稍后做详细的介绍
HeartLessProxy::run();
… …
deviceThread->run(); //调用SoundcardDevice::hardwareMain(0)
… …
rtpThread->run();//调用SoundCardDevice::processRTP()进行RTP流的处理
… …
//在这里向FS发送队列(myQ = new Fifo < Sptr < SubscribeMsg > >)中的各种消息,不
//过在Ua1001.cfg中,参数Subscribe_on设置为OFF所以,本章我们对FS暂不予以分析,
//在最后一章详细分析FS的时候回着重分析它.
featureThread->run();//调用subscribeManager::subscribeMain()
… …
//后台监测线程开启.
loadGenThread->run();//调用LoadGenMonitor::lgMain()

// User TimerEvent to kick start the load generator
… …
} // UserAgent::run

在Vovida的基础上实现自己的SIP协议栈(二)

卢政 2003/08/04
2. 5 HeartLessProxy Run方法的实现

HeartLessProxy::run()
{
myWorkerThread->run();
mySipThread->run();
}
通过上面可以看到有两个Run方法的调用,第一个是WorkThread的Run方法,它的主要作用是处理UaBuilder的Process方法,主要用来处理Sptr < Fifo < Sptr < SipProxyEvent > > > myFifo中的各种事件,前面已经详细的介绍了SipProxyEvent类的作用,这个类已经在前面介绍了,其实简单的说,它就是一个本地的各种事件的集合。
现在我们来看一下两个Run方法的实现:
2.5.1 WorkerThread的Run方法:
UaBuilder::process( const Sptr < SipProxyEvent > nextEvent )
{
//处理以下的四种事件
/// SipEvent
Sptr < SipEvent > sipEvent;
sipEvent.dynamicCast( nextEvent );
if ( sipEvent != 0 )
{
//处理本地的SIP的事件,包括对状态机的设置和命令/状态队列返回的操作在下面将//对它做详细的介绍
if( processSipEvent( sipEvent ) )
{
return;
}
//向消息队列myCallContainer中插入相应的事件信息。
sendEvent( nextEvent );
return;
}

/// UaDeviceEvent
Sptr < UaDeviceEvent > uaDeviceEvent;
uaDeviceEvent.dynamicCast( nextEvent );
if ( uaDeviceEvent != 0 )
{
//处理本地的设备事件,最主要的就是处理摘机信号;
if( processUaDeviceEvent( uaDeviceEvent ) )
{
return;
}
sendEvent( nextEvent );
return;
}

/// UaDigitEvent
Sptr < UaDigitTimerEvent > uaDigitEvent;
uaDigitEvent.dynamicCast( nextEvent );
if ( uaDigitEvent != 0 )
{

//处理在规定的时间间隔(Kickstart)主动呼叫事件的触发。
if( processUaDigitEvent( uaDigitEvent ) )
{
return;
}
sendEvent( nextEvent );
return;
}

/// UaTimerEvent
Sptr < UaTimerEvent > uaTimerEvent;
uaTimerEvent.dynamicCast( nextEvent );
if ( uaTimerEvent != 0 )
{
//在各种SIP命令的回应产生了超时事件后,系统的事件触发。例如:
//在StateTrying()中addEntryOperator( new OpStartTimer )在myEntryOperators队列中加入
//该Operator(指一个操作,例如呼叫或者是进入等待),这里我们这个Operator在时间到达
//以后户会被OpTimeout::process的方法检测到(isTimeout(event)进行检测,对StateTrying
//整个状态进行检测,也就是Trying事件),最后如果UaTimerEvent事件被触发,那么,//就会调用:stateMachine->findState( "StateError" )这个状态,进入错误状态,实施错误的
//处理机制,同时向myEntryOperators队列中加入一个新的Operator--OpStartErrorTone,
//从而被processUaTimerEvent过程扑捉到,最后通过SendEvent发送到执行队列里去。
if( processUaTimerEvent( uaTimerEvent ) )
{
return;
}
sendEvent( nextEvent );
return;
}
assert( 0 );

}

2.5.1.1 processSipEvent
  顾名思义,processSipEvent方法是对队列中的SIP消息进行处理,我们来看下面的程序:
bool UaBuilder::processSipEvent( const Sptr < SipEvent > sipEvent )
{
Sptr < StatusMsg > statusMsg;
statusMsg.dynamicCast( sipEvent->getSipMsg() );
// 检验是否为返回的状态码(主要是对Notify,Subscribe,Register三种状态进行单独处理)
//下面做详细介绍
if ( statusMsg != 0 )
{
if( handleStatusMsg( sipEvent ) )
{
return true;
}
}
//在这里表示接收到一个SIP的消息,
//检验是否为一个SIP的消息而不是一个状态(例如是否为Invite命令)
/// Let's check the call info, now
callId = sipEvent->getSipCallLeg()->getCallId();
callInfo = calls->findCall( callId );
if ( callInfo == 0 )
{
//下面分成两种状况进行讨论,一种是接受到Invite的消息,一种是接收到一个普通的
//命令,例如
Sptr < InviteMsg > inviteMsg;
inviteMsg.dynamicCast( sipEvent->getSipMsg() );
if ( inviteMsg == 0 )
{
//如果大家在这里有什么奇怪的话没有必要,为什么除了inviteMsg以外的所有的消
//息都不处理呢?其实这些消息都在SipThread这个程序中处理了,在Ua这个大状态
//机中所有的状态都是以Invite这个消息作为启动的。每一个INVITE启动一个系列的//消息和状态。
return true;
}
else
{
//收到一个Invite消息,这个时候我们就要进入相应的处理机制中了;
callInfo = calls->newCall
( sipEvent->getSipCallLeg()->getCallId() );
assert( callInfo != 0 );
callInfo->setFeature( stateMachine );
//如果进入的状态是自动呼叫(Auto Call)或者是自动应答(Auto Answer)状态(这
//两种状态的确定要在CFG文件中体现)
if ( UaConfiguration::instance()->getLoadGenOn() )
{
/// Assume this is a new call...
/// Also assume that we are not in use.
callInfo->setState( stateMachine->findState( "StateAutoIdle" ) );

//StateAutoIdle这个状态是一个自动应答和自动呼叫(按照呼叫列表)时候的状态,这里我
//们不做介绍,它本身和手动呼叫是非常相似的。
}
else // LoadGen is off
{
//下面这个程序会进入等待远端SIP事件和本地呼叫事件的状态StateIdle
if( handleCallWaiting( callInfo ) )
{
cpLog( LOG_ERR, "Returned from handleCallWaiting\n" );
return true;
}
}
} // lots of brackets!
}
return false;
} /// UaBuilder::processSipEvent

handleStatusMsg在做什么?

  前面我们已经作了简单的介绍,这个函数的主要目的是在处理Rgister,Notify,和Subscribe等几个状态,并且分别调用他们的处理机;
  Rgister调用它的处理机:
  handleRegistrationResponse他的主要作用是处理返回的各种Rgister状态,例如200,4XX或者是100等状态,另外它还负责在作为Mashal Server的时候转发各种状态时候,重新设定Expire的值;另外要注意的是在Register中增加了一个新的返回--Trying这个是非常合理的,特别是大型网络中,对服务器端的性能判定很有效,所以使用协议栈的同志能好好利用这个机制;另外如果发挥的值是401/407状态(未授权),还需要调用authenticateMessage做相应的处理,以返回的(401/407)状态中所带的密钥加密新的Rgister消息,发送给Register服务器重新进行授权判定;有兴趣的可以看看BaseAuthentication中的addAuthorization函数。在介绍UaMarshal和Redirect Server的时候会着重讨论这个问题。

  注明:Subscribe的处理机在Feature Server章节里面在再详细介绍)。

2.5.1.2 processUaDeviceEvent

  前面说了,processUaDeviceEvent主要是用来处理本地的设备事件,最主要就是处理摘机信号,在这里程序的流程我就不详细的列出,不过我们从主要的程序主体部分可以看出:

  在uaDeviceEvent->type == DeviceEventHookUp也就是检测了摘机以后,程序会采取某些必要的方式取得CallID(主要是通过CFG文件),最后让程序进入状态机的StateIdle状态,这个状态是接收和发送消息的初始状态,我们可以在后面将会重点介绍这个状态;

2.5.1.3 processUaDigitEvent

  也是主要通过判定CFG文件中的LoadGen_On的参数是On或者是Off来决定是否进入StateAutoIdle状态,或者是StateAutoRS状态(自动通过Marshal Server进行中转所有的SIP的消息和状态,在Marshal Server的时候会做详细的介绍)。

2.5.1.4 processUaTimerEvent

  这个的流程也实在没有什么好说的,前面也有了一定的介绍,如果大家对这些还有不明白的话,可以看一下SIP协议中Trying过程的走势,主要是对超时处理部分的介绍,就会明白(按照前面所说的UaBuilder::Process中关于SIP命令消息超时的介绍部分)。

2.5.2 SipThread的Run方法:

Void SipThread::thread()
{
… …
while ( true )
{
try
{
//接收所发送的消息,并且准备置入相关的队列中;
Sptr < SipMsgQueue > sipRcv( mySipStack->receive(1000) );

if ( sipRcv != 0 )
{
Sptr < SipMsg > sipMsg = sipRcv->back();

if ( sipMsg != 0 )
{
//根据本地的地址来检查是否发生了路由环路
if ( discardMessage(sipMsg) )
{
continue;
}
// 在这里的myOutputFifo就是 myCallProcessingQueue(异地输入消息的队
//列),在Workthread构建的时候会把这个队列带入作为处理参量
Sptr < SipEvent > nextEvent = new SipEvent(myOutputFifo);
if ( nextEvent != 0 )
{
//以下就是把新收到的消息载入队列当中。
nextEvent->setSipReceive(sipRcv);
nextEvent->setSipStack(mySipStack);
if(myCallLegHistory) nextEvent->setCallLeg();
myOutputFifo->add(nextEvent);
}
}
}
else
{
… …
}
}

catch ( VException& v)
{
… …
}
catch ( ... )
{
… …
}

if ( isShutdown() == true )
{
return;
}
}
}

2.5.2.1 SIP消息的接收/发送缓冲技术

a. 负责接收的主要程序体:
Sptr < SipMsgQueue > sipRcv( mySipStack->receive(1000) );这个方法就是利用SipTransceiver的receive方法接收SIP的消息;
Sptr < SipMsgQueue > SipTransceiver::receive(int timeOut)
{
Sptr < SipMsgQueue > msgQPtr = 0;
//以下是设立超时参数,如果发生超时,那么就让该命令无效;
timeval start, now;

if ( timeOut >= 0 )
{
gettimeofday(&start, 0);
}

while (msgQPtr == 0)
{
int timePassed = 0;
if ( timeOut >= 0 )
{
gettimeofday(&now, 0);

timePassed = ( now.tv_sec - start.tv_sec ) * 1000
+ ( now.tv_usec - start.tv_usec ) / 1000;

if (timePassed >= timeOut)
{
return 0;
}
}
recvdMsgsFifo.block(timeOut);

if ( !recvdMsgsFifo.messageAvailable() )
{
continue;
}

SipMsgContainer *msgPtr = recvdMsgsFifo.getNext();
if ( msgPtr == 0)
{
assert(0);
cpLog(LOG_CRIT, "received NULL");
continue;
}
#if 1
if ( natOn == true)
{
//这里是一个非常有意思的地方,虽然再程序主体中将它设定为False,也就是我们就
//不能采用NAT转换了,不过我还是想介绍一下,它主要是用在如果UA是一个标准
//的网关,或者是路由器设备的情况之下,在这个时候,它主要做各个消息包的转
//译工作,把路由(Via List)改成下一跳的IP地址和端口地址;
SipVia natVia = msgPtr->msg.in->getVia(0);
LocalScopeAllocator lo;
string addr1 = natVia.getHost().getData(lo);
string addr2 = msgPtr->msg.in->getReceivedIPName().getData(lo);
NetworkAddress netaddr1(addr1);
NetworkAddress netaddr2(addr2);
if ( netaddr1.getHostName() != netaddr2.getHostName())
{
natVia.setReceivedhost(msgPtr->msg.in->getReceivedIPName());
natVia.setReceivedport(msgPtr->msg.in->getReceivedIPPort());
//remove the first item from the via list
msgPtr->msg.in->removeVia(0);
//insert natvia in the vector via list
msgPtr->msg.in->setVia(natVia, 0);
}
}
#endif
//---NAT
/* *********************************************************************/


SipMsgQueue *msgQ = 0;
Sptr sipPtr = msgPtr->msg.in;

if(msgPtr->msg.in->getType() == SIP_STATUS)
//这两个是处理返回消息队列的函数,下面将重点介绍
msgQ = sentRequestDB.processRecv(msgPtr);
else
msgQ = sentResponseDB.processRecv(msgPtr);
//更新SNMP命令队列,并向SNMP网管中心发送接收的消息队列;
if(msgQ)
{
msgQPtr = msgQ;

//need to have snmpDetails for this.
if (sipAgent != 0)
{
updateSnmpData(sipPtr, INS);
}
}
else if(msgPtr->msg.in != 0)
{
send(msgPtr);
}
else if(msgPtr->msg.out.length())
{
send(msgPtr);
}
else
… …
}
}

b.描述接收/发送SIP消息队列的主要类:
  SipSentRequestDB:: processRecv和SipSentRequestDB::processSend是一对相互的方法,
另外还有SipSentResponseDB:: processRecv和SipSentResponseDB::processSend是用来记忆状态/消息的发送和接受的,在这里和Request的结构基本相同,就不做累述了;前者处理发送的SIP消息队列,后者处理接收的SIP消息队列,为了实现高效率的处理SIP的队列,在程序中大量采用了HASH表的方法,由于这个部分的程序非常的多,我不想一一把他们罗列出来,在这里就做一下简单的一个浏览:

  HASH队列的抽象:在这里有三个用于表示HASH表的类:
  SipTransLevel1Node,SipTransLevel2Node,SipTransLevel3Node;

  第一个是表的入口,它的组成由:目的地址 NameAddress源地址 From以及CallID三个部分叠加而成;

  第二个是表的索引,包括CSeq和Via 路由表

  第三个就是具体的消息对了,也就是一个呼叫命令组的列表;详见下图:


(点击放大)

  我们下面一一个简单的例子来描述一下一个INVITE消息的处理过程:


(点击放大)

A. 接收到一个Invite Message/发送一个180状态的情况的情况:
  1. 在UDP通道收到一个INVITE消息
  2. 创建了一个InvMsg,同时发送到SipSentResponseDB中做备份,我们要检查在这里有没有重复的副本;
  3.如果没有重复,那么InvMsg就放入RecvFifo中,准备让应用层进行处理;
  4.应用层通过SipTransciever接收到了InvMsg并且做出相应的处理;
  5.应用层产生了180回应到SipSentResponseDB中备份,
  6.180在SndFifo中排队,并且调用SipTransceiver中的SendReply方法回送消息

B.从对方接收到一个100(Trying)状态的作为向对方发送Invite消息回应的情况:
  1. 在UDP通道收到一个INVITE的状态;
  2. 创建了一个StatusMsg,同时发送到SipSentResquestDB中做备份,我们要检查在这里有没有重复的副本;
  3.如果没有重复,那么StatusMsg就放入RecvFifo中,准备让应用层进行处理;
  4.应用层通过SipTransciever接收到了StatusMsg并且做出相应的处理;
  5.应用层产生了ACK回应到SipSentResquestDB中备份,
  6.180在SndFifo中排队,并且调用SipTransceiver中的SendAsync方法回送ACK消息,

c.在存在一个呼叫重新定向的情况:
  *我们下面来看一个更加复杂一点的情况:


(点击放大)

1>SipSendtRequestDB::processSend方法:

  我们可以做一个很简单的举例,大家就对这两个方法有比较深入的了解了,可以以上面的Diagram1来做一个很好的例子比如,Marshal Server开始发送一个Invite的消息,由SipSendtRequestDB::processSend来进行处理,同时并且把这个消息装入SipMsgContainer中,然后消息被插入到SipTransactionList队列中:

  topNode->findOrInsert(id)->val->findOrInsert(id)
  最后放在SipTransLevel1Node,SipTransLevel2Node,SipTransLevel3Node形成一个新的节点。

2>SipSentRequestDB:: processRecv方法:
  例如我们接收了一个回应100 Trying这个回应的处理自然落在下面的这个部分:
int statusCode = response->getStatusLine().getStatusCode();
if((statusCode < 200) ||
((SipTransceiver::myAppContext == APP_CONTEXT_PROXY) &&
(statusCode == 200) &&
(response->getCSeq().getMethod() == INVITE_METHOD) ) )
… …
retVal = new SipMsgQueue;
retVal->push_back(msgContainer->msg.in)

  单纯的把消息队列返回上面的应用层;
  后续的180(Ringing)也是如此直接返回应用层;
  但是到了接受到200(OK),那么处理的方式就大不一样了因为OK以后命令交互阶段已经告一段落,那么我们通过SipTransactionGC::instance()-> collect的后台方法处理(Thread线程),根据Delay的时间的变化:如invCleanupDelay等等,删除当前的一些队列中消息所占用的内存(垃圾处理),(具体处理机制可以参看SipTransactionGC::thread()这个后台处理掉一些孤独的消息,例如有Request没有Response的等等,并且根据各个消息所占用的Delay时间来释放他们);

  但是如果没有收到200呢?假设我们收到了302(呼叫转移)呢?(例如在上面Diagram 1中所表现的那样)

答案在这里:
else if(response->getStatusLine().getStatusCode() >= 200)
{
if(level3Node->val->msgs.response)//这里是检验在消息队列中是否有应答
//产生,也就是Diagram 1中的Second Phase的情况,(第二个Invite消息)
{
SipTransactionList::SipTransListNode *
curr = 0;
if(level3Node->val->myKey == INVITE_METHOD)
{
curr = level2Node->val->level3.getLast();
while(curr)
{
// look for the ACK message
if(curr->val->myKey == ACK_METHOD &&
curr->val->msgs.request)
{
cpLog(DEBUG_NEW_STACK,"duplicate message: %s",
msgContainer->msg.out.logData());
//通过第一个ACK来复制第二个ACK,使用上二者完全相同,
msgContainer->msg.in = 0;
msgContainer->msg.out =
curr->val->msgs.request->msg.out;
msgContainer->msg.type
= curr->val->msgs.request->msg.type;
msgContainer->msg.transport =
curr->val->msgs.request->msg.transport;
msgContainer->msg.netAddr =
curr->val->msgs.request->msg.netAddr;

msgContainer->retransCount = FILTER_RETRANS_COUNT;

break;
}
curr = level2Node->val->level3.getPrev(curr);
}

  很明显复制一个ACK消息准备进行下一个新的Invite的发送,当然这个是要在有ACK发送以后才可以进行,如果没有那么我们可以假定ACK正处在Processing状态;

if(!curr)
{
msgContainer->msg.in = 0;
msgContainer->msg.out = "";
msgContainer->retransCount = 0;
}
}
else
… …

  在这个else下面所表示的处理机制是在第一个Message发送出去以后回应大于200的情况,也就是在Diagram 1中First Phase的情况,也就是发出第一个302的情况,在下面有一行语句:

  msgContainer->msg.out=msgContainer->msg.in->encode()

  它的主要目的是用于形成ACK应答,

  另外后面介绍Marshal Server的时候向异地发送Invite的时候返回4XX的回应,一般都是4XX等恶名招著的Response不会有其他的,本地一般采取的处理就是向应用层汇报,并且消除Hash队列里的所有驻留的消息。

  大家可以根据上面介绍的方法实验一下其他的情况,基本上都是合适的.
目前来说这个处理机制并不使最优的,特别是在服务器的状态,某些情况事实上并没有
一个具体的处理方法:例如4XX的回应,可能会造成超时等待过长。

2.6 在User Agent中的四个重要实例的Run方法:
  HeartLessProxy的两个Run方法都介绍完毕了,现在我们来看下面将要启动的四个Run过程:

2.6.1 媒体设备启动
  DeviceThread->run(); //调用SoundcardDevice::hardwareMain(0)

  第一个是调用Sound Card的处理进程,它最主要的用处是返回各种按键的处理信息,他的具体的作用可以参看程序,和具体的操作手册,非常的简单易懂,不用详细介绍,不过要注意的一点是,在程序中,启动按键事件的检测是通过RTP/RTCP的事件触发的,(很明显,例如在通话的时候按下z表示挂机,必须是在有RTP/RTCP事件),说简单了,没有设备,键盘事件无法触发。

2.6.2 启动RTP线程,用于对RTP/RTCP包的接收和发送管理;
  rtpThread->run //调用SoundCardDevice::processRTP()

  参看RtpThread实例化的过程可以看出,实际上就是调用SoundCardDevice的processRTP过程。

SoundCardDevice::processRTP ()
{
… …
if (audioStack == 0)
{
… …
return;
}

bool bNothingDo = true;

RtpSessionState sessionState = audioStack->getSessionState();

if ( sessionState == rtp_session_undefined )
{
deviceMutex.unlock();
… …
return;
}
if( sessionState == rtp_session_recvonly ||
sessionState == rtp_session_sendrecv )
{
// audioStack就是RtpSession,在这里它是在构建这个声音设备的时候,就创建它了。
//这里表示从一个创建好的RTP会话中接收一帧数据,
inRtpPkt = audioStack->receive();
if( inRtpPkt )
{
//这里的声卡目前只能接受一种压缩方式PCM,所以只能解析这一种最常用的,
if( inRtpPkt->getPayloadType() != rtpPayloadPCMU ||
//RTP的采样频率是否为要求的频率,例如为20ms
inRtpPkt->getPayloadUsage() != NETWORK_RTP_RATE )
{
cpLog(LOG_ERR,"Received from RTP stack incorrect payload type");
}
//将数据输出到声卡,
writeToSoundCard( (unsigned char*) inRtpPkt->getPayloadLoc(),
inRtpPkt->getPayloadUsage() );
bNothingDo = false;

… …
}
}

// 这里是发送一帧数据;
if( sessionState == rtp_session_sendonly ||
sessionState == rtp_session_sendrecv )
{
int cc;
if( audioStack->getRtcpTran() )
{ //如果有发送零声的情况,例如零声回送被叫端,这里在OpRing里通过//sendRemoteRingback过程来实现向远端回送零声(sendRingback=True)
if( sendRingback )
{
cc = getRingbackTone( dataBuffer, RESID_RTP_RATE );
#ifdef WIN32
Sleep(15);
#endif
}
else
{//从声卡中读入一帧数据,按照cfg文件中规定的采样标准
cc = readFromSoundCard( dataBuffer, RESID_RTP_RATE );
}
if ((cc > 0) && audioStack)
{//将这帧数据(毛数据,未压缩的作成RTP包发送出去);
audioStack->transmitRaw( (char*)dataBuffer, cc );
bNothingDo = false;
}
… …
}
}
… …
deviceMutex.unlock();

return;
}

2.6.3 合法用户列表的获取(Redirection Server专用)

  第三个过程是featureThread->run,,这个过程主要是用在向重定向服务器(Redirection Server)和Provisioning Server中的Feature线程,它实质上是调用 subscribe -Manager->subscribeMain,主体程序部分是向Provisioning Server发送Subscribe消息,在这个循环中会反复的发送SubScribe消息到Provision Server中去,稍后我们要介绍的UaBuilder::handleStatusMsg(UaBuilder::processSipEvent中)过程会将会处理从Provision Server 返回的Notify消息,关于Subscribe/Notify消息对的介绍我们可以参看在Vocal中的相关介绍,它的作用范围是在一个普通的UA向Marshal Server进行注册或者是证实的时候,Marshal Server同时向Redirection Server发出Register消息,并且由Redirection Server向Provisioning Server发送Subscribe消息,对用户列表进行检测;我们可以举一个例子来说明这个过程:



(点击放大)


我们来看Diagram.7

1> 在A阶段当启动Redirection Server(RS)的时候,RS向Provisioning Server(PS)发送SubScribe消息,取得合法的用户列表;
2> 在B阶段,UA端向Marshal Server发送Register消息,以确认自己是否在合法用户列表内;
3> 在C阶段,RS将通过Subscribe/Notify命令对把该用户的呼叫特性列表(呼叫等待,呼叫转接,语音邮件,呼叫前转,禁止呼叫等信息)得到该用户的呼叫特性;
我们在Redirection Server这一章内将详细介绍Subscribe/Notify命令对。

2.6.4 监测线程:
  一个调用的RUN方法loadGenThread->run是一个监测线程,检查各种回应和请求消息,并记录在LOG文件中。

2.6.5 自动呼叫
  在loadGenThread->run后面的程序实现了一个自动在预定时间内发送INVITE消息的过程,大家有兴趣可以参看OpAutoCall类,当在UserAgent::Run()中通过检测Cfg文件,通过setLoadGenSignalType(LoadGenStartCall)设定了一个公共变量以后,我们可以发现系统将自动进入OpAutoCall操作,并且启动INVITE开始呼叫。


(点击放大)

  好了,通过上面的介绍后我们需要知道如何让系统进入Idle状态,在这个状态中系统处于一种"等待"的状态,接收本地的命令输入,和远端的消息;这个状态是所有后续状态的一个初始阶段,在上述程序中我们可以在processSipEvent过程中找到handleCallWaiting子程序,就在该过程中让系统进入Idle状态;见下面的程序:
… …
if ( UaConfiguration::instance()->getLoadGenOn() )
{
callInfo->setState
( stateMachine->findState( "StateAutoIdle" ) );
}
else // LoadGen is off
{
if( handleCallWaiting( callInfo ) )
{
return true;
}
… …

在Vovida的基础上实现自己的SIP协议栈(三)

卢政 2003/08/05
3.开始一个呼叫和等待对方呼叫:

3.1 系统创建StateIdle状态:

StateIdle::StateIdle()
{
addOperator( new OpStartCall );
addOperator( new OpRing );
addOperator( new OpOnHook ); // bizarre case
}

  注意:所有的状态StateIdle,以及下面要介绍的StateInCall(大概有几十个左右)等等都是State的子类,所以他们统统都继承了State的所有方法,在State中通过addOperator的方法来增加"操作"例如:开始呼叫是OpStartCall,挂机是OpOnHook等等,在这个状态"容器"里通收集所需要的操作,并且通过State::Process方法调用执行他们;

  我们可以注意到在UaBuilder::process中的 SendEvent提供了一种把系统收到的各种消息(在UaCallInfo中汇集的,无论是本地硬件消息或者是异地接收的消息)传递到状态机(UaStateMachine)的方法, 并且分配调用当前状态的处理过程。

… …
UaBuilder::sendEvent( const Sptr < SipProxyEvent > nextEvent )
{
nextEvent->setCallInfo ( callInfo, calls );
stateMachine->process( nextEvent );//Here we use state machine to run elementin //operator queue
return;
}

  UaStateMachine状态机的作用主要是在运行UaCallInfo(由前面详细叙述的UaBuilder::process来装入各种事件(SipProxyEvent))容器中的各种操作,上面这句就是调用Sate(各种State的基类)中的process方法。

  我们下面来归纳的看一下如何让设备的状态进入等待命令的idle状态所遍列的类和方法:

UaBuilder->process() -UaBuilder->handleCallWaiting--> addOperator(new OpXXX)-->UaBuilder->sendEvent()-->UaStateMachine->process-->Sate->process()-->-->OpXXX->process()

3.2 开始一个呼叫:
  从上面的介绍我们可以知道,系统在初始化以后会进入Idle状态,如果用户使电话进入了摘机(Offhook),那么这个事件会发送到本地处理队列,这样OpstartDialing会检测到这个时间并且把系统放置在Dialing 状态中;这时用户输入被叫的电话号码,或者是Url,在Dialing状态中所有的事件被OpAddDigit操作处理,但是这个操作并不会让当前的状态从Dialing离开,而是维持到拨号完毕。

  当用户拨号完毕以后,这个时候会产生一个Dialing Complete事件,我们知道这个事件是由OpInviteURL产生,这个操作负责把INVITE消息发送到被叫端,并且让系统陷入相应的状态机中,这个时刻系统进入了Trying状态

  一旦用户进入Trying状态,主叫将会等待被叫摘起话筒的过程,如果被叫摘机,那么将会发送一个200的消息给主叫,主叫端的OpFarEndAnswered操作将会处理这个消息,并且让系统进入InCall状态,主叫/被叫之间将打开RTP/RTCP通道,开始通讯,当用户挂机以后,进入OpTermiateCall状态,并且互相发送Bye消息,中断通话,系统最终返回Idle状态。

3.2.1 OpStartCall主程序部分:
  这里我们可以看出在idle状态中首先经历的是OpStartCall操作,这个操作目的是开始呼叫
OpStartCall::process( const Sptr < SipProxyEvent > event )

{
Sptr < UaDeviceEvent > deviceEvent;
deviceEvent.dynamicCast( event );
if ( deviceEvent == 0 )
{
return 0;
}
if ( deviceEvent->type != DeviceEventHookUp )//这里是检测是否开始进行呼叫
{
return 0;
}
//如果该过程接收到一个DeviceEventHookUp事件,那么开始这个事件的处理过程
Sptr < UaStateMachine > stateMachine;
stateMachine.dynamicCast( event->getCallInfo()->getFeature() );
assert( stateMachine != 0 );
return stateMachine->findState( "StateDialing" );
//如果开始呼叫,那么我们转入StateDialing状态
}

3.2.2 取得键盘的事件
  调用deviceThread->run()(SoundCardDevice::process())来实现对键盘事件的检测,判断是什么事件--摘机(OffHook)挂机(OnHook),是否进行了拨号等等,在这个程序中所有的键盘全部解释为一个事件,这个程序很简单,我就不在这里做介绍了。

3.2.3 状态机(State)对各个操作(Operator)的处理过程
  我们回头再来看一下这个state::process()的处理核心机制,这个是所有的状态的处理机,各种状态将各自的操作序列(各个OPXXX)装入处理机中调用各自的process方法

首先我先解释三种操作(OPXXX)队列
  1. MyEntryOperators State的入口队列,例如OpStartDialTone,OpStartCall将会位于这个处理队列中
  2. MyOperators State中各种执行状态的集合
  3. MyExitOperators 退出状态的集合,例如OpStopBusyTone,OpStartRinging将要位于这个队列中

  这三个队列中包含了几乎全部的OPXXX系列操作符,用于管理和维护这些操作符,并且调动他们执行。

State::process(const Sptr < SipProxyEvent > event)
{
… …
Sptr < State > currentState = event->getCallInfo()->getState();
if ( currentState != PROXY_CONTINUE
&& currentState != this )
… …
Sptr < State > nextState = PROXY_CONTINUE;
for ( OperatorIter iter = myOperators.begin();
iter != myOperators.end();
iter++
)
{
Sptr < State > newState = (*iter)->process(event);//调用各个OPXXX的process处理机
//制
if( newState == PROXY_DONE_WITH_EVENT )
{//在Marshall server的时候有时侯server端会给状态机赋于这种状态
//一旦进入这种状态,那么程序将处于退出当前状态的执行阶段,进入下一个阶段
break;
}
else if( newState != PROXY_CONTINUE )
{ assert( nextState == PROXY_CONTINUE );
nextState = newState;
}
}
if( nextState != PROXY_CONTINUE )
{
processExit(event);//退出当前状态,进入下一个状态中
event->getCallInfo()->setState(nextState);
nextState->processEntry(event);//进入下一个状态。
}
return ( nextState );
}

3.2.4 开始一个呼叫所经历的各种操作(Operator):
  下图表示了从摘机到通话完毕的各个状态之间程序各种类之间的迁移过程,当然这个图还仅仅是略图,它只是表示了了一个终端简单的主动呼叫,摘机,通话,挂机的过程,如果协议栈软件应用于Marshal或者是Redirection Server的话,那么采用的协议流程和下面又有一些不一样了,以后会对这些做详细介绍。

  在本图中粗体的部分表示加入MyEntryOperator队列中的操作符
  正体的部分表示加入MyOperator队列中的操作符
  斜体的部分表示加入MyExitOperator队列中的操作符


(点击放大)


  我们根据SIP协议来定义一下一个发起呼叫或者是等待接收一个呼叫的状态迁移过程:
(根据Vocal中提供的Ua Simple State Transfer状态图)


(点击放大)


我们下面来对照这些状态一个个的进行介绍:

  从上面的程序中可以看到,在系统所有的状态初始化完毕以后,执行了内部方法handleCallWaiting系统进入一个StateIdle状态这个时候系统在等待本地呼叫和远端异地的呼叫:
我们前面已经知道了在UaBuilder::process中通过Send的方法向消息队列myCallContainer中发送相应的事件信息这里然后在stateMachine->process( nextEvent )通过状态机中的Process方法调用State::Process方法对这个队列进行中的SipProxyEvent处理,换句话来说,也就是我们把所有的从Uabuilder::Process提取的状态通过Send的方法发送到状态机中,由状态机调用各个状态的process方法,对各个状态(StateXXX)进行处理,这里我们可以参看一下下面的Idle状态的调用方法
3.2.5 如何进入待机状态(Idle状态)

  大家看一下,在deviceThread->run();(调用SoundcardDevice::hardwareMain(0))来检检测键盘事件,这个时候在下面程序中得到DeviceEventHookUp键盘事件,表示摘机,同时由在Uabuilder::Process()过程中的ProcessUaDeviceEvent中解释该事件,并且进入StateIdle状态。
… …
case 'a': // offhook
hookStateOffhook = true;
playDialTone = true;
event->type = DeviceEventHookUp;
break;
… …
这样创建了StateIdle:
StateIdle::StateIdle()
{
addOperator( new OpStartCall );
addOperator( new OpRing );
addOperator( new OpOnHook ); // bizarre case
}

  按照上面的加入MyOperator队列的方法将操作符加入队列中去,并且运行各个操作符的Process方法:
  OpStartCall为主动开始一个呼叫过程;
  OpRing为被动得等待远端的一个呼叫;

3.2.6 如何开始拨号并且开始一个呼叫:
  在OpStartCall中主要让用户的状态机陷入下一个状态中StateDialing这个操作是非常简单的直接进入拨号状态:

stateMachine->findState( "StateDialing" )
StateDialing::StateDialing()
{
addEntryOperator( new OpStartDialTone );
addOperator( new OpAddDigit );
addOperator( new OpStopDialTone );
addOperator( new OpInviteUrl );
addOperator( new OpDialError );
addOperator( new OpOnHook );
}

  这里是加入操作队列准备开始执行的操作符

3.2.6.1 OpStartDialTone:
  第一个操作符是OpStartDialTone顾名思义这个操作符是为本地提供拨号音的操作,不过在这里出现了一个本地设备事件的处理队列:

… …
Sptr < UaHardwareEvent > signal = new UaHardwareEvent( UaDevice::getDeviceQueue() );
signal->type = HardwareSignalType;
signal->signalOrRequest.signal = DeviceSignalDialToneStart;//设置本次操作的操作类型
UaDevice::getDeviceQueue()->add( signal );//在处理队列中加入本次需要处理的操作。
… …

处理本次事件的程序部分在声卡描述这个类里:处理顺序如下:

SoundCardDevice::hardwareMain-->ResGwDevice::processSessionMsg( myQ->getNext() )
--> … …case HardwareSignalType:
if( msg->signalOrRequest.signal == DeviceSignalFwding )
… …
provideSignal((msg->signalOrRequest).signal)
--> …… case DeviceSignalDialToneStart:
……. provideDialToneStart();

  到了这个程序就非常简单了provideDialToneStart主要提供了处理本地放送拨号音的过程,打开设备发送声音。在程序中大量使用了HardwareMsg MyQ这个队列,目的主要上建立各个线程(例如上面所说介绍的OpStartDialTone操作),和本地设备之间的处理消息传递的通道

3.2.6.2 输入电话号码开始拨号:
  OpAddDigit 的主要作用是把在摘机以后输入的十进制的IP电话号码或者是URL放在DigitCollector队列中以便后续的拨号程序使用,这两种方式在程序中都适用,程序中会相应的Url 改变成十进制的IP电话号码放入Dial这个字符串中;请注意这里模拟了一个拨号等待超时的方法(调动UaDigitTimerEvent类)。

3.2.6.3 OpStopDialTone主要是用于关掉拨号音。

3.2.6.4 OpInviteUrl目的是用在建立一个INVITE命令并且向被叫发送;
  我们现在来看一下他的程序结构,而在此之前我们来看一下一个普通的SIP呼叫/应答/的过程



  这里我们先来看第一个Invite命令的发送:
  我们知道一个SIP的基本命令包括以下几个基本字段:
  From:请求的发起方,to:请求的接收方,Call-ID请求标识,
  Cseq:命令的序列号,Via:路由列表,Contact:后续通讯地址;
  一个基本的INVITE命令如下所示,注意这个INVITE的构造方式也适用于Marshal, Feature等其他需要发送/转发INVITE的设备上:

SIP Headers
-----------------------------------------------------------------
sip-req: INVITE sip:93831073@192.168.36.180 SIP/2.0 [192.168.6.20:50753-
>192.168.36.180:5060]
Header: Via: SIP/2.0/UDP 192.168.6.20:5060
Header: From: sip:5120@192.168.6.20
Header: To: [sip:93831073@192.168.36.180]
Header: Call-ID: c2943000-23e062-2e278-2e323931@192.168.6.20
Header: CSeq: 100 INVITE
Header: Expires: 180
Header: User-Agent: Cisco IP Phone/ Rev. 1/ SIP enabled
Header: Accept: application/sdp
Header: Contact: sip:5120@192.168.6.20:5060
Header: Content-Type: application/sdp
Header: Content-Length: 218
-----------------------------------------------------------------
SDP Headers
-----------------------------------------------------------------
Header: v=0
Header: o=CiscoSystemsSIP-IPPhone-UserAgent 21012 9466 IN IP4 192.168.6.20
Header: s=SIP Call
Header: c=IN IP4 192.168.6.20
Header: t=0 0
Header: m=audio 25776 RTP/AVP 0 101
Header: a=rtpmap:0 pcmu/8000
Header: a=rtpmap:101 telephone-event/8000
Header: a=fmtp:101 0-11

const Sptr < State >
OpInviteUrl::process( const Sptr < SipProxyEvent > event )
{

Sptr < UaDigitTimerEvent > timerEvent;
timerEvent.dynamicCast( event );
if ( timerEvent == 0 )
{
return 0;
}

... ...
//根据DigitCollector中的内容创建一个合法的URL
toUrl = new SipUrl( digitCollector->getUrl() );

if ( dial_phone == digitCollector->getDialMethod() )
{
… …
toUrl->setUserValue( toUrl->getUserValue(), dial_phone );
}
... ...
//Proxy_Server表示的是被叫方代理服务器的地址
string proxyServer = UaConfiguration::instance()->getProxyServer();
//如果对方的SIP地址不完整的话,那么就要用被叫方的代理服务器
if ( toUrl->getHost().length() <= 0 && proxyServer.length() )
{
NetworkAddress na( proxyServer );
//形成一个具体的被叫的SIP地址,并设置端口号码,例如//93831073@192.168.36.180:5060
toUrl->setHost( na.getIpName() );
if( na.getPort() > 0 )
{

toUrl->setPort( na.getPort() );
}
}

... ...
proxyUrl = new SipUrl( "sip:" + proxyServer );
}
... ...

//定义本地接收异地的SIP消息的接收端口;我们根据Cfg文件暂时定为5000
string sipPort = UaConfiguration::instance()->getLocalSipPort();
//根据目的SIP地址和接收端口构造一个SIP的Invite命令,同时通过函数:
//setInviteDetails构造一个Via头的部分内容,以及From,to,Cseq, contact
InviteMsg msg( toUrl, atoi( sipPort.c_str() ) );
//设置Call-ID
msg.setCallId( *(UaDevice::instance()->getCallId()) );
//设置请求头启始行,例如: sip-req: INVITE sip:93831073@192.168.36.180 SIP/2.0 [192.168.6.20:50753->192.168.36.180:5060]
//如果存在代理服务器的话,那么根据RFC3261中的7.1项目来构造请求头部启始//行,这里的可能只包括9383107的IP电话NUM而没有代理服务器部分。
if ( proxyServer.length() > 0 )
{
SipRequestLine reqLine = msg.getRequestLine();
Sptr< BaseUrl > baseUrl = reqLine.getUrl();
assert( baseUrl != 0 );
if( baseUrl->getType() == TEL_URL )
{
... ...
}
// Assume we have a SIP_URL
Sptr< SipUrl > reqUrl;
reqUrl.dynamicCast( baseUrl );
assert( reqUrl != 0 );
//设置被叫端的代理服务器SIp地址以及端口号
reqUrl->setHost( proxyUrl->getHost() );
reqUrl->setPort( proxyUrl->getPort() );

if(UaConfiguration::instance()->getSipTransport() == "TCP")
{
reqUrl->setTransportParam( Data("tcp"));
}
reqLine.setUrl( reqUrl );
//最后完整的设置一个被叫端的SIP地址到请求头部。
msg.setRequestLine( reqLine );
}
//设置From头部
SipFrom from = msg.getFrom();
//设置显示的用户名到From条目中去
from.setDisplayName( Data( UaConfiguration::instance()->getDisplayName()));
Sptr< BaseUrl > baseUrl = from.getUrl();
assert( baseUrl != 0 );
... ...}
// Assume we have a SIP_URL
Sptr< SipUrl > fromUrl;
fromUrl.dynamicCast( baseUrl );
assert( fromUrl != 0 );
//设置用户名
fromUrl->setUserValue( Data( UaConfiguration::instance()->getUserName() ),
"phone" );
from.setUrl( fromUrl );
msg.setFrom( from );

//设置路由表,首先取出当前的INVITE消息中的路由表,设置传输协议路由表中//的其他信息在setInviteDetails已经做了设定(主机名,端口名等)
SipVia via = msg.getVia();
msg.removeVia();
via.setTransport( UaConfiguration::instance()->getSipTransport() );
msg.setVia( via );
//设置Contact头部,这里如果是发送一个Invite的消息(开始呼叫一个远端的主
//机)那么采用的头部的Contact地址当然本主机的地址。
// Set Contact: header
Sptr< SipUrl > myUrl = new SipUrl;
myUrl->setUserValue( UaConfiguration::instance()->getUserName(), "phone" );
myUrl->setHost( Data( theSystem.gethostAddress() ) );
myUrl->setPort( atoi( UaConfiguration::instance()
->getLocalSipPort().c_str() ) );
if(UaConfiguration::instance()->getSipTransport() == "TCP")
{
myUrl->setTransportParam( Data("tcp"));
}
SipContact me;
me.setUrl( myUrl );
msg.setNumContact( 0 ); // Clear
msg.setContact( me );

Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );
//设置SDP
addSdpToMsg(msg,
//设置每个毫秒的RTP包的个数;
UaConfiguration::instance()->getNetworkRtpRate(),
//设置RTP的端口
UaDevice::instance()->getRtpPort());

Sptr sipSdp;
sipSdp.dynamicCast ( msg.getContentData( 0 ) );

if ( sipSdp != 0 )
{
call->setLocalSdp( new SipSdp( *sipSdp ) );
int tmp;
}
//保存Invite消息在UaCallInfo队列中,以便在Ring Back状态的时候可对状态进
//行检测,看其是否为当前的INVITE详见后续的OPStartRingBackTone中对
//getRingInvite的方法调用,在出现两个INVITE(例如呼叫转移或者是SDP不适配)
//等情况那么,如何做到选定合适的SDP。
call->setRingInvite( new InviteMsg( msg ) );

//发送INVITE,timerEvent在这里是用来做发送超时的检测,调用UaOperator::
//StarTimer开始当前的记时,并且在时间TimeOut以前发送当前的INVITE命令。

timerEvent->getSipStack()->sendAsync( msg );
call->setContactMsg(msg);
Sptr < UaStateMachine > stateMachine;
stateMachine.dynamicCast( event->getCallInfo()->getFeature() );
assert( stateMachine != 0 );
//转入StateTrying状态。
return stateMachine->findState( "StateTrying" );
}

3.2.7 进入Trying状态:
  我们知道在这个状态机中,发送出一个INVITE消息以后,我们将让系统进入等待远端发送Trying命令,首先我们来看进入的操作状态:

3.2.7.1 OpStartTimer启动每个事件的定时器:
  addEntryOperator( new OpStartTimer );
  这里通过UaOperator::setTimer的方式来启动UaTimerEvent这个事件定时器,目的是设置后续每个事件的超时操作,唤起超时处理。

3.2.7.2 挂机事件的检测机制
  addOperator( new OpOnHook );
  调动一个线程来检测挂机事件,这里不做累述;

3.2.7.3 OpStartRingbackTone向被叫进行铃声回放。
  addOperator( new OpStartRingbackTone );
震铃回放的实现,也就是接收到对方回传的180/183振铃消息

OpStartRingbackTone::process( const Sptr < SipProxyEvent > event )
{
Sptr < SipMsg > sipMsg = sipEvent->getSipMsg();
Sptr < StatusMsg > msg;
msg.dynamicCast( sipMsg );
… …
Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
//检验消息状态是否为180/183
if ( msg->getStatusLine().getStatusCode() != 180 &&
msg->getStatusLine().getStatusCode() != 183 )
{
… …
};
//消除定时
if ( cancelTimer(event) )
Sptr < Contact > contact = call->findContact( *msg );
… …
int status = contact->getStatus();
if ( status == 180 || status == 183 )
{
… …
}
bool remoteRingback = true;
contact->update( *msg );
//这里是确定是哪一个Invite消息得到的回应,正如在2.5所说的,Invite消息发送//的时候保存Invite消息在UaCallInfo队列中,以便在Ring Back状态的时候可对
//状态进行检测,看其是否为当前的INVITE,在出现两个INVITE(例如呼叫转移
//或者是SDP不适配)等情况那么,并且做到选定合适的SDP。
Sptr < SipSdp > localSdp;
//取得引起震铃的INVITE消息
Sptr < InviteMsg > inviteMsg = call->getRingInvite();
//取出当前的Contact字段中所引发的INVITE消息,在OpInviteUrl::process中会定义这//个当前的SipCOntact字段。
InviteMsg invMsg = call->getContact()->getInviteMsg();
//两者相比较,检验是否引起震铃的INVITE消息和当前Contact字段中的INVITE消息
//是否相同
if ( *inviteMsg == invMsg )
{
//从UaCallInfo中取出相应的SDP
localSdp = call->getLocalSdp();
}
else
{
… … //从UaCallInfo中取出相应的SDP
localSdp = call->getLocal2Sdp();
}

int rtpPacketSize = UaConfiguration::instance()->getNetworkRtpRate();
//从被叫端回送的Ring消息中取得相应的SDP,如果没有的话,那么就不接收异地来的//振铃。一般情况下,是没有振铃回送的,太花费网络资源,而且价值也不是很大。
Sptr remoteSdp;
remoteSdp.dynamicCast( sipMsg->getContentData(0) );
//remoteSdp=0的情况表示没有给本地,所以
if( remoteSdp == 0 )
{
remoteRingback = false;
}
else
{
int rtpPacketSize = getRtpPacketSize(*remoteSdp);
if(rtpPacketSize > 0)
{
//设置铃声回送的RTP Packet数值
setRtpPacketSize(*localSdp, rtpPacketSize);
call->setRemoteSdp( new SipSdp( *remoteSdp ) );
}
else
{
remoteRingback = false;
}
}

Sptr < UaHardwareEvent > signal =
new UaHardwareEvent( UaDevice::getDeviceQueue() );
//如果需要铃声回送的话,那么在下面的部分就打开RTP/RTCP通道,准备接收远端的振铃
if ( remoteRingback )
{
call->getContact()->setRemoteRingback( true );

signal->type = HardwareAudioType;
struct HardwareAudioRequest* request
= &(signal->signalOrRequest.request);
request->type = AudioStart//打开RTP会话开始回放远端铃声;稍后在OpACK中做详细介绍
strcpy( request->remoteHost, "\0" );
request->remotePort = 0;
LocalScopeAllocator lo;
strcpy( request->localHost, localSdp->getConnAddress().getData(lo) );
request->localPort = localSdp->getRtpPort();
request->rtpPacketSize = rtpPacketSize;
}
else
{//本地响铃
call->getContact()->setRemoteRingback( false );
signal->type = HardwareSignalType;
//这里调用 ResGwDevice::processSessionMsg-->
// ResGwDevice::provideSignal-->
// case DeviceSignalLocalRingbackStart:
// provideLocalRingbackStart()
// -->SoundCardDevice::provideLocalRingbackStart()
// provideTone( RingbackToneEmulation )来实现本地振铃
signal->signalOrRequest.signal = DeviceSignalLocalRingbackStart;
}
UaDevice::getDeviceQueue()->add( signal );
return 0;
}

3.2.7.4 OpReDirect进行重定向服务的操作
  addOperator( new OpReDirect );
  当接收到重定向命令以后

a. 一个接收重定向的基本过程:
  在这里我们只着重阐述一下302在UA端的处理过程,至于其他的3XX系列的处理,和302基本上大同小异。


(点击放大)


b.一个携带302状态的消息:
302状态消息:
sip-res: SIP/2.0 302 Moved Temporarily [192.168.26.180:5060->192.168.26.10:5060]
Header: Via: SIP/2.0/UDP 192.168.26.10:5060
Header: From: [sip:6711@192.168.26.10:5060]
Header: To: [sip:6715@192.168.26.180:5060]
Header: Call-ID: c2943000-ce262-1b5c2-2e323931@192.168.26.10
Header: CSeq: 100 INVITE
Header: Contact: [sip:6716@192.168.26.180:5060]
Header: Content-Length: 0
Header: CC-Redirect: [sip:6716@192.168.26.180:5060];redirreason=
unconditional;redir-counter=0;redir-limit=99
ACK消息:
sip-req: ACK sip:6715@192.168.26.180 SIP/2.0 [192.168.26.10:50373-
>192.168.26.180:5060]
Header: Via: SIP/2.0/UDP 192.168.26.10:5060
Header: From: sip:6711@192.168.26.10
Header: To: [sip:6715@192.168.26.180]
Header: Call-ID: c2943000-ce262-1b5c2-2e323931@192.168.26.10
Header: CSeq: 100 ACK
Header: Content-Length: 0
下一个INVITE消息:
sip-req: INVITE sip:6716@192.168.26.180:5060 SIP/2.0 [192.168.26.10:50373-
>192.168.26.180:5060]
Header: Via: SIP/2.0/UDP 192.168.26.10:5060
Header: From: sip:6711@192.168.26.10
Header: To: sip:6716@192.168.26.180:5060
Header: Call-ID: c2943000-de262-1b626-2e323931@192.168.26.10
Header: CSeq: 101 INVITE
Header: Expires: 180
Header: User-Agent: Cisco IP Phone/ Rev. 1/ SIP enabled
Header: Accept: application/sdp
Header: Contact: sip:6711@192.168.26.10:5060
Header: Content-Type: application/sdp
Header: Content-Length: 221

c.原代码部分:
const Sptr < State >
OpReDirect::process( const Sptr < SipProxyEvent > event )
{
Sptr < SipEvent > sipEvent;
sipEvent.dynamicCast( event );
Sptr < SipMsg > sipMsg = sipEvent->getSipMsg();
assert( sipMsg != 0 );
Sptr < StatusMsg > msg;
msg.dynamicCast( sipMsg );
switch ( msg->getStatusLine().getStatusCode() )
{ //以下是几种3XX系列的状态码:
case 300: // Multiple Choices
case 301: // Moved Premanently
case 302: // Moved Temporary
case 305: // Use Proxy
break;
default:
}

Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );
//根据From/To/Call ID在已经已经接收到的队列中来寻找和302消息相匹配的INVITE消息
Sptr < Contact > origContact = call->findContact( *msg );
assert( origContact != 0 ); Sptr < Contact > origContact = call->findContact( *msg );

//按照上面状态图所指示的,在这里先创建一个回应消息的ACK(仅仅是对于UA端)
//如何创建可以参看AckMsg::setAckDetails(const StatusMsg& statusMsg)
AckMsg ack( *msg );
Sptr< BaseUrl > baseUrl =
origContact->getInviteMsg().getRequestLine().getUrl();
assert( baseUrl != 0 );
if( baseUrl->getType() == TEL_URL )
… …
// Assume we have a SIP_URL
//这里根据从origContact中得到的URL值Request URL设置ACK并且发送;
Sptr< SipUrl > reqUrl;
reqUrl.dynamicCast( baseUrl );
assert( reqUrl != 0 );
SipRequestLine reqLine = ack.getRequestLine();
reqLine.setUrl( reqUrl );
ack.setRequestLine( reqLine );
sipEvent->getSipStack()->sendAsync( ack );
//我们知道在302消息的返回的Contact字段中包含了被叫移动后的新地点,如果发生多个
//移动的话(有可能被叫在多个marshal上进行了注册,需要进行呼叫查询)
for ( int i = 0; i < msg->getNumContact(); i++ )
{
SipContact sipContact = msg->getContact( i );
//以下是根据Contact返回的地址来创建新的INVITE消息
baseUrl = sipContact.getUrl();
assert( baseUrl != 0 );
Sptr< SipUrl > newUrl;
//从Contact取得被叫端的URL
newUrl.dynamicCast( baseUrl );
//从Contact取得被叫端的传输方式(TCP或者UDP)
Data tprt = newUrl->getTransportParam();
assert( newUrl != 0 );
if (newUrl->getUserParam() == "phone")
{ //从Cfg文件中取得代理服务器的名称
string proxyServer = UaConfiguration::instance()->getProxyServer();
string::size_type colonPos = proxyServer.find( ":" );
//设置新的INVITE的代理服务器名称(具体可以参看UA1001.cfg)
newUrl->setHost(proxyServer.substr( 0, colonPos ));
if ( colonPos < string::npos )
{//设置端口号码
newUrl->setPort(
proxyServer.substr( proxyServer.rfind( ":" ) + 1 ));
}
else
{
newUrl->setPort("5060");
}
}

//根据上一个INVITE(也就是得到302消息的前面一个INVITE)消息,创建一个//新的INVITE。
InviteMsg inviteMsg( origContact->getInviteMsg(), newUrl );

//根据新的INVITE命令的Request line设置一个新的VIA
SipVia via = inviteMsg.getVia(0);

if(tprt == "tcp")
{
via.setTransport("TCP");
}
else
{
via.setTransport("UDP");
}
inviteMsg.removeVia(0);
inviteMsg.setVia(via, 0);

//check it is not a loop
bool isLoop = false;
//TODO Fix this
if ( !isLoop )
{
//把Call ID设置成和上一个INVITE相同。
inviteMsg.setCallId( sipMsg->getCallId() );
// CSeq要累加一
SipCSeq newCSeq = inviteMsg.getCSeq();
int cseq = sipMsg->getCSeq().getCSeqData().convertInt();
//设置新的Cseq
newCSeq.setCSeq( ++cseq );
inviteMsg.setCSeq( newCSeq );
… …
sipEvent->getSipStack()->sendAsync( inviteMsg );

// Create the new contact
发送
Sptr < Contact > contact = new Contact( inviteMsg );
//在UaCallInfo中增加Contact列表
// Add this to the contact list
call->addContact( contact );
//在UaCallInfo中的Contact列表设置当前连接,以便为有可能的下一个302消息创造当前的INVITE
call->setContact( contact );

// 更新UaCallInfo中的Ring-Invite消息
call->setRingInvite( new InviteMsg( inviteMsg ) );
}
}
return 0;
}

在Vovida的基础上实现自己的SIP协议栈(四)

卢政 2003/08/06
3.2.7.5 授权检查

a.示意图和信令部分:


SIP Headers
-----------------------------------------------------------------
sip-req: INVITE sip:93831073@192.168.36.180 SIP/2.0 [192.168.6.20:50753-
>192.168.36.180:5060]
Header: Via: SIP/2.0/UDP 192.168.6.20:5060
Header: From: sip:5120@192.168.6.20
Header: To: [sip:93831073@192.168.36.180]
Header: Call-ID: c2943000-23e062-2e278-2e323931@192.168.6.20
Header: CSeq: 100 INVITE
Header: Expires: 180
Header: User-Agent: Cisco IP Phone/ Rev. 1/ SIP enabled
Header: Accept: application/sdp
Header: Contact: sip:5120@192.168.6.20:5060
Header: Content-Type: application/sdp
Header: Content-Length: 218
-----------------------------------------------------------------
SDP Headers
-----------------------------------------------------------------
Header: v=0
Header: o=CiscoSystemsSIP-IPPhone-UserAgent 21012 9466 IN IP4 192.168.6.20
Header: s=SIP Call
Header: c=IN IP4 192.168.6.20
Header: t=0 0
Header: m=audio 25776 RTP/AVP 0 101
Header: a=rtpmap:0 pcmu/8000
Header: a=rtpmap:101 telephone-event/8000
Header: a=fmtp:101 0-11

407消息:
sip-res: SIP/2.0 407 Proxy Authentication Required
->[192.168.26.180:5060->192.168.26.10:5060]
Header: Via: SIP/2.0/UDP 192.168.26.10:5060
Header: From: [sip:6711@192.168.26.180:5060]
Header: To: [sip:6711@192.168.26.180:5060]
Header: Call-ID: c2943000-1e262-513-2e323931@192.168.26.10
Header: CSeq: 100 REGISTER
Header: WWW-Authenticate: Digest
realm=vovida.com,algorithm=MD5,nonce=966645751
Header: Content-Length: 0
下一个INVITE消息:
sip-req: INVITE sip:93831073@192.168.36.180 SIP/2.0 [192.168.6.20:50753-
>192.168.36.180:5060]
Header: Via: SIP/2.0/UDP 192.168.6.20:5060
Header: From: sip:5120@192.168.6.20
Header: To: [sip:93831073@192.168.36.180]
Header: Call-ID: c2943000-23e062-2e278-2e323931@192.168.6.20
Header: CSeq: 101 INVITE
Header: Authorization: Digest
username="6711",realm="vovida.com",uri="sip:192.168.26.180",response="fee2efef60a99b4576c
Header: Expires: 180
Header: User-Agent: Cisco IP Phone/ Rev. 1/ SIP enabled
Header: Accept: application/sdp
Header: Contact: sip:5120@192.168.6.20:5060
Header: Content-Type: application/sdp
Header: Content-Length: 218

b.程序主体部分:
addOperator (new OpReInviteAuthenticated)
这个新的操作是指在接收到407(需要授权验证的Inivte)回应消息以后系统的操作。
const Sptr < State >
OpReInviteAuthenticated::process( const Sptr < SipProxyEvent > event )
{
Sptr < SipEvent > sipEvent;
sipEvent.dynamicCast( event );
… …
Sptr < SipMsg > sipMsg = sipEvent->getSipMsg();
assert( sipMsg != 0 );

Sptr < StatusMsg > msg;
msg.dynamicCast( sipMsg );

switch ( msg->getStatusLine().getStatusCode() )
{
case 407: // Proxy Authentication Required
break;
default:
{
return 0;
}
}

int cseq = sipMsg->getCSeq().getCSeqData().convertInt();
Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
… …
//和上面Redirect消息的处理机制一样,找到源头的INVITE并且向Marshal Server 回送ACK
Sptr < Contact > origContact = call->findContact( *msg );
assert( origContact != 0 );

AckMsg ack( *msg );

Sptr< BaseUrl > baseUrl =
origContact->getInviteMsg().getRequestLine().getUrl();
assert( baseUrl != 0 );
if( baseUrl->getType() == TEL_URL )
{
assert( 0 );
}
// Assume we have a SIP_URL
Sptr< SipUrl > reqUrl;
reqUrl.dynamicCast( baseUrl );
assert( reqUrl != 0 );

SipRequestLine reqLine = ack.getRequestLine();
reqLine.setUrl( reqUrl );
ack.setRequestLine( reqLine );
sipEvent->getSipStack()->sendAsync( ack );

//如果前面已经接收到了一个407的状态回应,那么我们就认为密码错误,进入校验循环直接跳出
if( origContact->getStatus() == 407 )
{
… … return 0;
}

Sptr inviteMsg = new InviteMsg( origContact->getInviteMsg() );

//这个下面的过程和上面的Redirect非常相似,重新创建一个INVITE命令我们可以参考
//Redirect的下面的处理部分,这里不做累述
inviteMsg->setCallId( sipMsg->getCallId() );
newCSeq.setCSeq( ++cseq );
inviteMsg->setCSeq( newCSeq );

//建立授权,把密码和用户名称通过authenticateMessage
if(authenticateMessage( *msg,
*inviteMsg,
UaConfiguration::instance()->getUserName(),
UaConfiguration::instance()->getPassword() ))
{
// went OK
sipEvent->getSipStack()->sendAsync( *inviteMsg );
}
else
{
// xxx there was an authentication problem, so we need to abort out
}

call->setContactMsg(inviteMsg);


// Update the Ring INVITE message
call->setRingInvite( new InviteMsg( *inviteMsg ) );
return 0;
}

c.授权消息检查的各个子项目:
  这里是指的鉴别授权的方式我们可以看一下在RFC 2543中如何定义的请参看RFC 2543的6.42 WWW-Authenticate的介绍):

  在回应的状态消息中包含有401/407(Unauthentication)状态的时候,表示需要进行授权检查,在回应头部包含有一个或者多个的由密钥计算方法和密钥参数组成的域例如:

  Header: WWW-Authenticate: Digest//表示采用密钥检验的算法
  realm=vovida.com,//主叫和被叫方所使用的信任值,也就是授权值,表示主叫/注册服务器共享密码//域(就是所有的用户在这个区域内采用相同的密码,相当于公钥)。

  algorithm=MD5,//算法
  nonce=966645751//407/401消息回应的鉴别符,防止重放攻击。



  主叫接收到401/407消息以后,回应的下一个命令(例如INVITE)中必须包含鉴权因子在里面:

  Header: Authorization: Digest
  username="6711",realm="vovida.com",uri="sip:192.168.26.180",response="fee2efef60a99b4576c

  上面几个因子我就不详细说明意思了,大家应该一看意思就可以明白:

  那么我们根据RFC2543中14.3的Digest鉴别方式可以看出:最后在主叫部分的信任状(Respone)如何实现:

  response=username⊕realm⊕uri⊕realm⊕Password⊕Username
  最后在注册服务器端根据上述的算式重算response,并且判断结果主叫的response是否相等。
具体的程序按照下面的方式层次调用,具体代码不做详细的分析:

A.主叫端(客户端)

1.authenticateMessage( *msg, *inviteMsg,
UaConfiguration::instance()->getUserName(),
UaConfiguration::instance()->getPassword() )
... ... ...

void addWwwAuthorization(const StatusMsg& errorMsg, SipCommand& cmdMsg,
Data username,Data password)
... ... ...
void addAuthorization(const StatusMsg& errorMsg,
SipCommand& cmdMsg,
Data username,
Data password,
bool useProxyAuthenticate)

void SipCommand::setAuthDigest(const Data& nonce, const Data& user,
const Data& pwd, const Data& method,
const Data& realm, const Data& requestURI,
const Data& qop, const Data& cnonce,
const Data& alg, const Data& noncecount,
const Data& opaque)
{
Sptr authorization;
myHeaderList.getParsedHeader(authorization, SIP_AUTHORIZATION_HDR);

SipDigest sipDigest;
//核心的MD5加密算法,主要计算出response
Data response = sipDigest.form_SIPdigest(nonce, user, pwd, method,
requestURI, realm, qop, cnonce, alg, noncecount);

cpLog(LOG_DEBUG_STACK, "setAuthDigest::Response = %s\n",
response.logData());

//set this as response in authorization.
authorization->setAuthScheme(AUTH_DIGEST);
以下部分是设定授权的头部的各个域的值
if(user != "")
{
authorization->setTokenDetails("username", user); //1
}
if(realm != "")
{
authorization->setTokenDetails("realm", realm); //2
}
if(nonce != "")
{
authorization->setTokenDetails("nonce", nonce); //3
}
if(response != "")
{
authorization->setTokenDetails("response", response); //4
}
if(qop != "")
{
authorization->setTokenDetails("qop", qop); //5
}
if(requestURI != "")
{
authorization->setTokenDetails("uri", requestURI); //6
}
if(cnonce != "")
{
authorization->setTokenDetails("cnonce", cnonce); //7
}
if(noncecount != "")
{
authorization->setTokenDetails("nc", noncecount); //8
}
if(opaque != "")
{
authorization->setTokenDetails("opaque", opaque); //9
}
if(alg != "")
{
authorization->setTokenDetails("algorithm", alg); // 10
}
}

Data SipDigest::form_SIPdigest( const Data& nonce,
const Data& user,
const Data& pwd,
const Data& method,
const Data& requestURI,
const Data& realm,
const Data& qop,
const Data& cnonce,
const Data& alg,
const Data& noncecount)

B.注册服务器端(Marshal Server)授权鉴定(不列举原代码只简单介绍过程)



3.2.7.6 OpFarEndAnswered处理接收到的OK回应
addOperator( new OpFarEndAnswered )
一个标准的OK回应的构成:

SIP Headers
-----------------------------------------------------------------
sip-res: SIP/2.0 200 OK [192.168.36.180:5060->192.168.6.21:5060]
Header: Via: SIP/2.0/UDP 192.168.6.21:5060
Header: From: [sip:5121@192.168.6.21:5060]
Header: To: [sip:5120@192.168.36.180:5060];tag=c29430002e0620-0
Header: Call-ID: c2943000-e0563-2a1ce-2e323931@192.168.6.21
Header: CSeq: 100 INVITE
Header: Contact: [sip:5120@192.168.6.20:5060]
Header: Record-Route:
[sip:5120@192.168.36.180:5060;maddr=192.168.36.180],
[sip:5120@192.168.36.180:5060;maddr=1]<92.168.36.180>
Header: Server: Cisco IP Phone/ Rev. 1/ SIP enabled
Header: Content-Type: application/sdp
Header: Content-Length: 218
-----------------------------------------------------------------
SDP Headers
-----------------------------------------------------------------
Header: v=0
Header: o=CiscoSystemsSIP-IPPhone-UserAgent 13045 2886 IN IP4 192.168.6.20
Header: s=SIP Call
Header: c=IN IP4 192.168.6.20
Header: t=0 0
Header: m=audio 30658 RTP/AVP 0 101
Header: a=rtpmap:0 pcmu/8000
Header: a=rtpmap:101 telephone-event/8000
Header: a=fmtp:101 0-11
const Sptr < State >
OpFarEndAnswered::process( const Sptr < SipProxyEvent > event )
{
Sptr < SipEvent > sipEvent;
sipEvent.dynamicCast( event );
Sptr < SipMsg > sipMsg = sipEvent->getSipMsg();
Sptr < StatusMsg > msg;
msg.dynamicCast( sipMsg );
if(msg->getStatusLine().getStatusCode() > 200)
{
return 0;
}
Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );
Sptr findContact = call->findContact( *msg );
//在OpInviteUrl和OpReInviteAuthenticated 需要有Invite命令发送的程序中有用到//call->setContactMsg来设定当前的Contact项目;在这里通过call->getContact的方法来
//进行对比,以检验是否和OK中返回的Contact项目相符合,如果不符合则表示必定不
//是当前的这个INVITE命令返回的内容。
Sptr getContact = call->getContact();
if ( *( call->findContact( *msg ) ) != *( call->getContact() ) )
{
return 0;
}
call->getContact()->update( *msg);
int status = msg->getStatusLine().getStatusCode();
if ( status >= 200 )
{ 发送ACK给被叫端
AckMsg ack( *msg );
sipEvent->getSipStack()->sendAsync( ack );
}
if ( status != 200 )
{
return 0; // Done
}
//如果返回的Contact有两个联络地址,那么建立新的路由,并且将该新路由项目保存
//在UaCallInfo中
call->setCallerRoute1List( msg->getrecordrouteList() );
int numContact = msg->getNumContact();
if ( numContact )
{
SipContact contact = msg->getContact( numContact - 1 );
Sptr < SipRoute > route = new SipRoute;
route->setUrl( contact.getUrl() );
call->addRoute1( route );
}
Sptr sdp;
sdp.dynamicCast ( sipMsg->getContentData(0) );
… …
//在UaCallInfo中保存远端回传的SDP
call->setRemoteSdp( new SipSdp( *sdp ) );

Sptr < SipSdp > localSdp = call->getLocalSdp();
Sptr < SipSdp > remoteSdp = call->getRemoteSdp();
//RSVP消息的发送和接受设置(下面做详细介绍)。
rsvpFarEndAnswered(localSdp, remoteSdp);
Sptr < UaStateMachine > stateMachine;
stateMachine.dynamicCast( event->getCallInfo()->getFeature() );
assert( stateMachine != 0 );
//转入SateInCall状态
return stateMachine->findState( "StateInCall" );
}

3.2.7.7在Vocal中如何实现RSVP资源预留协议:
  RSVP资源预留协议的具体内容我们在这里就不做详细的介绍了,如果对这个还不了解的可以看一下RFC 1633中对RSVP的具体定义,另外在draft-ietf-rsvp-rapi-01.txt中定义关于RSVP相关的基本API函数调用。

RSVP一般工作机理如下图所示:


  发送方发送PATH消息,消息中包含有数据业务特征,该消息沿所选的路径传送,沿途的路由器按照PATH准备路由资源,接收方接收到PATH消息以后,根据业务特征和所需要的QOS计算出所需要的资源,回送RESV消息,消息中包含的参数就包括请求预留的带宽,延PATH的原路途返回,沿途的路由器接收到RESV操作后才执行资源预留操作。发送方接收到RESV消息以后才发送用户数据。

一.在H.323协议中如何实现RSVP功能:
  对于一个H.323或者是SIP的多媒体通讯系统而言,为了保证实时通讯的质量,一般来说采用了很多方面来保证QOS,对于H.323来说方式没有SIP那样灵活,在H.323v3版本采用了一些几种方式来增强QOS保证:

a. 增强的RAS过程,在ARQ中指明了是否具备资源预留能力;
b. 增强的能力交换过程,收发端点都具备RSVP功能,通过能力交换过程可以双方具备RSVP能力(RSVP属于能力集合的一个部分),在OpenLogicalChannel原语中定义了一个参数qOSCapability来表示;
c. 增强的逻辑信道能力在逻辑信道打开过程中包含Path和Resv两个过程
下面我们用图来表示逻辑信道的打开过程和资源预留过程:


1. 发送端点向接受端点发送OpenLogicalChannel消息在qOSCapability中标明该信道的RSVP参数和综合业务类别。
2. 接收端点创建RSVP会话(调用Rapi_session API)向发送端点发送OpenLogicalChannel Ack。
3. 在OpenLogical Ack中包含FlowControl=0,抑制当前的媒体数据流。
4. 4和5表示发送端点和接收端点执行RSVP过程。
5. 接收端点接收到ResvConfirm以后知道预留成功。
6. FlowControl为最大的比特率,当前的媒体数据流为最大。

  要注意的一点是由于通讯是双向的实际上述的过程发送和接收方完全要对掉所以上述的过程要执行两遍。

二.在SIP中实现RSVP功能:

  Vocal的SIP协议栈软件中提供了一个非常简便的实现RSVP的方式,当然按照这个方式实现RSVP是相当的不成熟的,很多参量在应用程序都没有反馈并且处理,仅仅是在路由器之间相互的汇报,不过这个简单的方式实现RSVP的构架,所以仍然有一定的使用价值。

  一般说来在SIP中实现RSVP的步骤如下:


(点击放大)

  在上图中实线的部分是SIP命令,虚线部分是RSVP消息

Vocal中的RSVP实现过程:
1. 首先是主叫部分发送INVITE命令,我们知道命令中包含有主叫的会话描述(这里我们称为Remote SDP);
2. 被叫部分此时处于OpRing的状态中接收到主叫的INVITE消息以后,根据主叫的INVITE消息和主叫的SDP,得到主叫的地址和主叫的RSVP端口(主叫的RTP端口);被叫调用setupRsvp子程序发送包含有数据流标识和数据业务流特征的PATH消息到主叫,具体发送的业务流Tspec特征如下:
//Sender Tspec的定义:
rapi_tspec_t *tspec_ptr = &(snd_tspec);
qos_tspecx_t *qos_tspec = &(tspec_ptr->tspecbody_qosx);
qos_tspec->spec_type = QOS_TSPEC;//发送方业务流特征标示
qos_tspec->xtspec_r = 10000;// Token Rate (B/s)//业务流量
qos_tspec->xtspec_b = 200;// Token Bucket Depth (B)//标记存储桶宽度
qos_tspec->xtspec_p = 10000;// Peak Data Rate (B/s)//突发流量
qos_tspec->xtspec_m = 200;// Minimum policed unit//
qos_tspec->xtspec_M = 200; /* default 65535 */ Maximum SDU size
tspec_ptr->len = sizeof(rapi_hdr_t) + sizeof(qos_tspecx_t);
tspec_ptr->form = RAPI_TSPECTYPE_Simplified;

  这里似乎和RSVP的--呼叫方发送PATH消息的精神有一些违背,是被叫方发送PATH消息,其实二者没有什么不同,首先主叫方,没有收到被叫方的SDP所以不能确定被叫方接收RSVP消息的端口和IP地址,其次,媒体流是双向的,双方都必须在网路上通过PATH--Reserve的方式预流资源。

3. 在完成了一系列SIP命令和状态的交换(RING,OK过程)以后,呼叫方开始准备发送ACK消息了,也就是处于操作OpFarEndAnswered()的时候,调用rsvpFarEndAnswered发送Reserve消息,为什么要在这个时候发送Reserve消息呢?因为主叫在下一个过程(收到ACK消息后,打开RTP通道之前)的时候,已经保证了所有的主叫到被叫之间的路由器都已经收到了PATH预留消息,

4.第5,6两个消息是主叫端点向被叫端点之间的路由器发送PATH消息,并且接收对端的RESV消息的过程。和1,2,3的过程基本上一样,最后在双方的RTP通道打开之前,主叫/被叫之间的路由器实现稳定状态,也就是都收到主叫和被叫的资源预留的信息。

5.在被叫端点主要调用的函数:
a. void setupRsvp(SipSdp& localSdp, SipSdp& remoteSdp)
主要由被叫端调用,用于在收到主叫发送过来的INVITE消息以后根据主叫的SDP回送被叫资源预留PATH消息。
b. void rsvpFarEndAnswered(Sptr localSdp, Sptr remoteSdp)
主要由主叫端调用,用于在向被叫端发送ACK消息前向被叫发送RESV消息和开始主叫资源预留PATH消息。
c. void rsvpAckHandler(Sptr localSdp, Sptr remoteSdp)
主要由被叫端调用,用于在收到主叫发送过来的ACK消息以后根据主叫的SDP回送主叫资源预留RESV消息。

6. 如何改进Vocal的RSVP机制使它在广域网上应用:

  一.对于目前在Vocal中对于RSVP的处理过程是非常简单的,至少在用户端都没有对AdSpec和Tspec做任何具体的运算,仅仅是交给路径上的路由器去预留资源,这样做如果是一个简单的没有太复杂的网络状态的区域网内部,采用这种方法当然是无可厚非的,不过如果是在有复杂网络状态的广域网上这样就可以说是不是很行得通了,一般来说在主干网络上会运行DiffServ的机制(所有的流都分组为多个服务类别的方式),这样在骨干网上的RSVP消息当然就会被忽略,所以我们的PATH和SESV消息都要实现对Diffserv的映射,换一句话来说,就地让骨干网看起来象RSVP的一个节点,一般来说我们把DTOT(子网络到主干网络的传播延迟 在RFC2205中有定义)改变成透过骨干网的传播延迟和平均排队延迟的值(这个是由主干网罗入口/出口路由器做的工作),对于RESV来说上沿着已经建立的途径传递,那么这个问题就不存在了。

  另外有几个情况是在如果主干网络使用DiffSev需要注意的:

a. 如果有一个流不符合Tspec时--而这个时候路由器已经为所有的入口和出口规划了每一条虚链路的时候,一个不符合Tspec的流就足以毁坏同一类别所有其他留所争取的服务质量,例如入口处归纳低质量的视频/音频流时候,出现了高质量的视频流。
b. 分散的/突发的流合并到平缓的流中时候。
  不过一般来说每个路由器都具备检查流的Tspec的能力,特别是作为主干网络入口的路由器(例如一些大的网络(BGP/EBGP)的入/出口地方)。在运行视频会议或者是其他突发流很多的恶劣工作状况的时候:

  我们前面已经反复地阐述:在Vocal中只不过是实现了一个简单RSVP构架,最重要的一点就是它不能够实现软状态,也就是定期刷新消息的Tspec和Rspec,如果在视频信号的时候这样的情况出现得特别频繁,由于视频信号并不总是处于一种稳定的平缓的状态传输,以及当路由改变的时刻,RSVP消息需要能准确的沿着新的路由往复(这种情况是常常出现的,特别在大型网络中)。

  解决上述问题的途径首先就是要在RSVP建立保证服务预定,也就是要根据接收端根据发送端的AdSpec消息计算预留的带宽(而在Vocal中基本上没有处理AdSpec),AdSpec中参加带宽运算的主要是两个参数:Dtot和Ctot,第一个参数是最小路径延迟,第二个是路径带宽,通过这两个参数根据公式D=(b(存储桶深度)+Ctot)/p + Dtot计算出端到端之间的延迟:

例如:
PATH流的初始特征:
Tspec(p=10mbps,L=2kbps,r=1mbps,b=32kbps) AdSpec(Ctot=0,Dtot=0)
经过第一个路由器:
Tspec(p=10mbps,L=2kbps,r=1mbps,b=32kbps) AdSpec(Ctot=11,Dtot=0.05s)
经过第二个路由器:
Tspec(p=10mbps,L=2kbps,r=1mbps,b=32kbps) AdSpec(Ctot=55,Dtot=0.1s)
现在我们来计算Resv中Rspec项目:
最长的延迟为0.1s的延迟,我们在Rspec中所计算的预留带宽必须符合这个要求,那么根据公式:
D=(b+Ctot)/p + Dtot

b:存储桶深度
计算出的D为0.185S我们在根据这个公式来计算预留带宽
R=((p-r)(L+Ctot)+(b-L)p)/((t-Dtot)(p-r)+b-L)
r:业务流量
t:所需要的延迟

  注意:所需要的延迟在0.1-0.185之间变化,这样我们通过上述公式得出一个比较确切的R值R=1.66Mbps。所以Rspec为:R=1.66Mbps;松弛项(S):用于指示Qos的富裕量,如果所有的路由器按照R预留,那么整个路径上的端到端的延迟会比要求的时延要少S毫秒:这里可以选择0.05S,具体的松弛项计算可以参看RFC2205定义。

  上面我们解决了服务预定的问题,但是它只不过让我们的终端程序运行进一步合理化,可以正确的规划需要预留的带宽,但是中间还是有关键问题没有解决,也就是我们上面说的软状态--定期刷新Tspec和Rspec,以及探测/感知主干路由的变化

  从程序上实现这些事实上并不困难,我们在打开媒体通讯的RTP信道以后,可以用一个进程定期的发送PATH和RESV消息,让流的接收者进行定期刷新,特别是在视频通讯阶段(采用H.263算法)出现帧间帧的时候,数据的流量必然会大大增加,这个时候,如果提前刷新预留的状态,那么我们可以在初期就避免网络出现阻塞的问题,当然,如果流量超过了路径所承受的标准,那么必然会依靠增加S(松弛度)来阻塞数据包,这个时候,必然通讯会出现一个比较明显的延迟,不过根据实验结果表明,这些还是可以接受的。

  如果IP路由发生了变化,那么上述的解决办法同样的适用,定期传送RSVP的消息可以重新根据流的路径进行新的定义,所以他们也会沿着新的路径进行传输,所以沿着相反路径的RESV消息将试图沿着新的路由进行预定,旧的预定就会超时然后取消。

  但是我们如果考虑到主干网络使用DiffServ就不会那么乐观了(当然主干网络的路由变化不会那么剧烈),主干网络上PATH项目中变化的部分就是AdSpec中的Dtot/Ctot,我们在上面已经说了如何定义Dtot的内容(子网络到主干网络的传播延迟的计算 在RFC2205中有定义)改变成透过骨干网的传播延迟和平均排队延迟的值,所以目前来说如果主干网络是功能强大的千兆路由器组成,那么PATH中的Dtot和Ctot可以在中继的时候得到更新,如果是ATM或者是MPLS网络的话,出入口也可以得到更新,这样的话你事实上完全不必操心。

  不过不管怎么说上述的数值如果是周期性计算并在RESV消息中更新的话,必然大大的加大路由器的运算开销,特别在跨洋多点视频会议的时候,这样用户接入服务供应商区域网入口路由器可能会发生"饿死"的情况(没有用户数据时候反而被大量无用的RESV所淹没),所以在使用主干网络的时候,我们必须有一个机制探测主干网络的传播延迟并且通报给服务供应商区域网入口路由器,最好是主干入口路由器本身就具备这样的功能,进一步简化主干网络为一个简单的RSVP节点,变成由主干网络的接入路由器通告服务供应商的路由器端对端的延迟 ,这样把计算的过程交给主干路由器*,而服务供应商区域网入口路由器只负责更新AdSpec,这样效率就可以得到大大的提高,不过协议的复杂性就增加了不少。**

3.2.8用户处于通话的StateInCall状态:

StateInCall::StateInCall()
{
addEntryOperator( new OpStartAudioDuplex );
addOperator( new OpAck );
addOperator( new OpConfTargetOk );
addOperator( new OpFwdDigit );
addOperator( new OpTerminateCall );
addOperator( new OpEndCall );
addOperator( new OpReInvite );//多方会议和呼叫等待
addOperator( new OpStartCallWaiting );

if ( UaConfiguration::instance()->getXferMode() != XferOff )
{
addOperator( new OpSecondCall );
addOperator( new OpRecvdXfer );
}

addExitOperator( new OpStopAudio );
}
  无论是主叫还是被叫,最后情况下都会进入到StateInCall状态中去,顾名思义,这个状态是打开媒体流(RTP/RTCP)通道,并且开始语音通讯的状态.

3.2.8.1 OpStartAudioDuplex主叫打开媒体通道端口,向被叫发送媒体信息

  首先我们来看一下在这个状态中加入的第一个操作OpStartAudioDuplex,作为主叫端,它打开了主叫的RTP/RTCP端口,开始向被叫发送媒体消息。

const Sptr < State >
OpStartAudioDuplex::process( const Sptr < SipProxyEvent > event )
{
Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );
Sptr < SipSdp > remoteSdp;
Sptr < SipSdp > localSdp;

  注释:*主干路由器大部分时候和用户接入服务供应商区域网入口路由器采用静态路由的方式接入,所以我们预想对于主干路由器来说事实上只要仅仅发送通告给服务供应商区域网入口路由器就可以了,

  **这个问题我们曾经在IETF的讨论中和很多通讯工程师进行讨论,完全可以增加此类的限制,不过上述的情况有不少人认为发生的可能性不大,我个人一直认为随着服务供应商所提供的多媒体业务的进一步扩展,这种危机可能在近几年就会要爆发出来。


//取得引起振铃的INVITE消息
Sptr < InviteMsg > inviteMsg = call->getRingInvite();

//取出当前的Contact字段中所引发的INVITE消息,在OpInviteUrl::process中会定义这//个当前的SipCOntact字段。
InviteMsg invMsg = call->getContact()->getInviteMsg();
//两者相比较,检验是否引起震铃的INVITE消息和当前Contact字段中的INVITE消息
//是否相同,相同的话就把对端和本地的SDP取出,
if ( *inviteMsg == invMsg )
{
remoteSdp = call->getRemoteSdp();
localSdp = call->getLocalSdp();
}
else
{//不相同的话以远端第二次回送的震铃消息为准,取出SDP
remoteSdp = call->getRemote2Sdp();
localSdp = call->getLocal2Sdp();
}
assert( localSdp != 0 );
//挂上本地设备事件的处理队列
Sptr < UaHardwareEvent > signal
= new UaHardwareEvent( UaDevice::getDeviceQueue() );
//定义所要处理的事件类型
signal->type = HardwareAudioType;
//向当前声音设备(Sounnd Card或者是Phone Card等等)控制台发送请求
struct HardwareAudioRequest* request = &(signal->signalOrRequest.request);
//开始声音发送,ResGwDevice::processSessionMsg中会调用这个状态的检测,并且会打
//开声音设备发送媒体信息,调用ResGwDevice::audioStart,稍后介绍。
request->type = AudioStart;
… …
//设定远端的Rtp接收发送端口
request->remotePort = remoteSdp->getRtpPort();
//设定本地的Rtp接收发送端口
// Set local host and port
request->localPort = localSdp->getRtpPort();
strcpy( request->localHost,
localSdp->getConnAddress().getData(lo) );
……
request->echoCancellation = true;
//设定RTP包的发送速率
request->rtpPacketSize = getRtpPacketSize(*localSdp);
… …
//停止震铃回送。
request->sendRingback = false;
//在本地设备事件的处理队列挂上当前的处理请求
UaDevice::getDeviceQueue()->add( signal );

return 0;
}

在Vovida的基础上实现自己的SIP协议栈(五)

卢政 2003/08/07
3.2.8.2处理RTP/RTCP包:

  前面说了ResGwDevice::processSessionMsg处理挂在设备处理队列里的各个命令,我们具体来看具体的应用程序处理过程:

a.处理用户发出的终端消息,并且打开设备发送媒体包。
ResGwDevice::processSessionMsg( Sptr event ):
void ResGwDevice::processSessionMsg( Sptr event )
{
Sptr msg;
msg.dynamicCast( event );
if( msg != 0 )
{
cpLog( LOG_DEBUG, "Got message type: %d", msg->type );
switch( msg->type )
{
case HardwareSignalType://这个状态是为Voicamail而设定的,在Feauture
//Server这章里面会说道
……
case HardwareAudioType:
switch ((msg->signalOrRequest).request.type)
{
case AudioStart://打开声音设备建立RTP/RTCP会话
audioStart((msg->signalOrRequest).request);
break;
case AudioStop://停止声音设备,并且释设备占用的资源并且停止建立的
//话,将其资源释放;
audioStop();
break;
case AudioSuspend:
audioSuspend();//暂停设备,但是资源不释放,RTP会话也不停止。
break;
case AudioResume:
audioResume((msg->signalOrRequest).request);//重新启动设备
break;
default:
cpLog( LOG_ERR, "Unknown audio request: %d",
(msg->signalOrRequest).request.type );
}
break;
… …
}
}
}

b.根据远端和本地的SDP建立RTP会话(经过简化):
int SoundCardDevice::audioStart( const HardwareAudioRequest& request )
{
deviceMutex.lock();

// create new audioStack for this audio session
// 0 is rtpPayloadPCUM
// last paramter, -1, disables jitter buffer
if( audioStack == 0 )
{
int remoteRtcpPort = (request.remotePort > 0) ? request.remotePort + 1 : 0;
int localRtcpPort = (request.localPort > 0) ? request.localPort + 1 : 0;
cerr << "%%% Remote rtcp port : " << remoteRtcpPort << "\n";
cerr << "%%% Local rtcp port : " << localRtcpPort << "\n\n";
const char* remoteHost = 0;
if ( request.remotePort != 0 )
remoteHost = request.remoteHost;
//创建RTP会话,带入的参数有:被地本地/远端的主机/RTP,RTCP,端口RTP的载荷类型,//网络承载类型,创建接收/发送RTP/RTCP包的控制台,以及接受播放的缓冲区Inbuff
audioStack = new RtpSession( remoteHost, request.remotePort,
request.localPort, remoteRtcpPort,
localRtcpPort, rtpPayloadPCMU,
rtpPayloadPCMU, 0 );
}
else
{
… …
}
//决定是否开启/关断向远方回送的震铃
if( request.sendRingback )
startSendRingback();
else
stopSendRingback();
… …
// apiFormat_clockRate
// apiFormat_payloadSize
//设置RTP包的承载类型,目前设置为PCMU方式,以及包的大小
audioStack->setApiFormat( rtpPayloadPCMU, request.rtpPacketSize*8 );
//传输/接收时的RTP包的大小,这里设置成和RTP包相同大小类型
audioStack->setNetworkFormat( rtpPayloadPCMU, request.rtpPacketSize*8 );
deviceMutex.unlock();
reopenAudioHardware();
return 0;
}

c.如何接收或者发送RTP/RTCP数据包:
我们在前面已经看到了在SoundCardDevice::processRTP ()调用了RTPSession::Receive()以及RTPSession::TransimitterRAW()的方法,来接收/发送RTP,RTCP数据流.
1> RTP数据流的接收,它不会直接删除数据,但是会用替代的方式对inbuff数据做更新:
RtpPacket* RtpReceiver::receive ()
{
RtpPacket* p = NULL;
int len = 0;
int len1 = 0;
int silencePatched = 0;
bool faking = 0;


// empty network que
NtpTime arrival (0, 0);
while (1) // network empty or time to play return packet
{ //从网络设备的缓冲队列中取出数据
p = getPacket();
if (p == NULL) break;

// only play packets for valid sources
if (probation < 0)
{
cpLog(LOG_ERR, "****Packet from invalid source");
delete p;
p = NULL;
continue;
}
//获取包的抵达时间
arrival = getNtpTime();
int packetTransit = 0;
int delay = 0;


rtp_ntohl(p);

// convert codec
if (p->getPayloadType() != apiFormat)
{
#ifndef __sparc
// 当前接收到的包不符合目前的格式(假设为PCMU格式,那么下面做格式转换)
//例如mono转PCMU8K 16Bit具体的调用格式可以参看//convertCodec(RtpPayloadType fromType, RtpPayloadType toType,
// char* in_data, char* out_data, int len)
//它的主要作用在于将数据区内的数据进行格式转换,至于带入的参数我想就不//用过多解释了。有兴趣的同志可以参考
//unsigned char linear2ulaw( int pcm_val );
//int ulaw2linear( unsigned char u_val )这两个函数的原代码
RtpPacket* oldp = p;
p = convertRtpPacketCodec (apiFormat, oldp);
… …
#endif
}
//取得有效载荷的长度
len = p->getPayloadUsage();
if (len <= 0 || len > 1012)
{
delete p;
p = NULL;
continue;
}

// 重新调整接收到的RTP包的长度,使长度在网络承载允许范围内
if (len > networkFormat_payloadSize )
{
int lenold = len;
len = ( len / networkFormat_payloadSize ) * networkFormat_payloadSize;
p->setPayloadUsage( len );
network_pktSampleSize = (lenold / networkFormat_payloadSize) * network_pktSampleSize;
}

… …
根据接收到RTP的分组序号和时间戳标志的数据对Inbuff里的数据包进行重排,
if (RtpSeqGreater(p->getSequence(), prevSeqRecv))
{
在这里是把包增加到数据的队列尾
while (RtpSeqGreater(p->getSequence(), prevSeqRecv))
{
silencePatched = 0;
faking = 0;
//下面程序部分是在收到的分组头部增加白燥声。
while( RtpTimeGreater( p->getRtpTime() - network_pktSampleSize, prevPacketRtpTime ) && ((p->getSequence() - 1) == prevSeqRecv))
{
if( silenceCodec == 0 )//
{
cpLog( LOG_DEBUG_STACK, "Patching silence" );
if ((p->getPayloadType() >= rtpPayloadDynMin) &&
(p->getPayloadType() <= rtpPayloadDynMax) &&
(codecString[0] != '\0'))
{
silenceCodec = findSilenceCodecString(codecString, len);
}
else
{//添加白噪音
silenceCodec = findSilenceCodec( p->getPayloadType(), len );
}
if( silenceCodec == 0 )
{
if( len > rtpCodecInfo[ numRtpCodecInfo - 1 ].length )
{
assert( 0 );
}
silenceCodec = (char*)&rtpCodecInfo[ numRtpCodecInfo - 1 ].silence;
faking = 1;
}
}
assert( silenceCodec );

if ((inPos + len) < IN_BUFFER_SIZE)
{
memcpy (inBuff + inPos, silenceCodec, len);
inPos += len;
silencePatched++;
}
else
{
// circular memory copy
len1 = IN_BUFFER_SIZE - inPos;
memcpy (inBuff + inPos, silenceCodec, len1);
memcpy (inBuff, silenceCodec + len1, len - len1);
inPos = len - len1;
//printf("inPos S=%d\n", inPos);
silencePatched++;
}
prevPacketRtpTime += network_pktSampleSize;
}
if( prevPacketRtpTime != p->getRtpTime() - network_pktSampleSize)
{
prevPacketRtpTime = p->getRtpTime() - network_pktSampleSize;
}
//在inbuff队列中插入已经待播放的分组,


if ((inPos + len) < IN_BUFFER_SIZE)
{
memcpy (inBuff + inPos, p->getPayloadLoc(), len);
inPos += len;
}
else
{
// circular memory copy
len1 = IN_BUFFER_SIZE - inPos;
memcpy (inBuff + inPos, p->getPayloadLoc(), len1);
memcpy (inBuff, p->getPayloadLoc() + len1, len - len1);
inPos = len - len1;
}

//更新受到包的计数器
RtpSeqNumber tSeq = prevSeqRecv;
prevSeqRecv++;
if(prevSeqRecv > RTP_SEQ_MOD)
{
prevSeqRecv = 0;
}
if (prevSeqRecv < tSeq)
{
cpLog(LOG_DEBUG_STACK, "Recv cycle");
assert(prevSeqRecv == 0);
recvCycles += RTP_SEQ_MOD;
}
}
prevPacketRtpTime = p->getRtpTime();
if (silencePatched > 0)
cpLog(LOG_DEBUG_STACK, "silencePatched = %d", silencePatched);
if (faking)
silenceCodec = 0;
if (p->getSequence() != prevSeqRecv)
{
cpLog(LOG_DEBUG_STACK, "Unequal packet:%d stack:%d",
prevSeqRecv, p->getSequence());
}
}
else
{
RtpSeqNumber base_prevSeqRecv = prevSeqRecv;
int inSeqRecv = 1;
while (RtpSeqGreater(base_prevSeqRecv, p->getSequence()))
{
inSeqRecv++;
base_prevSeqRecv--;
}
int inPosTemp = inPos - inSeqRecv * len;
if (inPosTemp < 0) inPosTemp = IN_BUFFER_SIZE + inPosTemp;

if ((inPosTemp + len) < IN_BUFFER_SIZE)
{
memcpy (inBuff + inPosTemp, p->getPayloadLoc(), len);
}
else
{
// circular memory copy
len1 = IN_BUFFER_SIZE - inPosTemp;
memcpy (inBuff + inPosTemp, p->getPayloadLoc(), len1);
memcpy (inBuff, (p->getPayloadLoc()) + len1, len - len1);
}
}

// update packet received
packetReceived++;
payloadReceived += len;

// update jitter calculation
packetTransit = arrival - rtp2ntp(p->getRtpTime());
delay = packetTransit - transit;
transit = packetTransit;
if (delay < 0) delay = -delay;
jitter += delay - ((jitter + 8) >> 4);

// fractional
// s->jitterTime += (1./16.) * ((double)deley - s->jitterTime);
// integer
//jitterTime += delay - ((jitterTime+8) >> 4);


if (p)
{
delete p;
p = NULL;
}
}

int packetSize = apiFormat_payloadSize;
… …

//按照apiformat_playloadsize的长度,分割原有的数据包,重新构造一个RTP数据包以便适
//合设备播放,当然如果双方把apiformat_playloadsize和networkFormat_payloadSize设置相
//同也可以。
assert (!p);
p = new RtpPacket (packetSize);
if ( (playPos + packetSize) < IN_BUFFER_SIZE)
{
memcpy (p->getPayloadLoc(), inBuff + playPos, packetSize);
playPos += packetSize;
}
else
{
len1 = IN_BUFFER_SIZE - playPos;
memcpy (p->getPayloadLoc(), inBuff + playPos, len1);
memcpy (p->getPayloadLoc() + len1, inBuff, packetSize - len1);
playPos = packetSize - len1;
}

//构造RTP数据包的包头部分
p->setSSRC (ssrc);
p->setPayloadType (apiFormat);
p->setPayloadUsage (packetSize);
p->setRtpTime (prevRtpTime + api_pktSampleSize);
p->setSequence (prevSeqPlay + 1);

if (probation > 0) probation --;
receiverError = recv_success;
prevRtpTime = p->getRtpTime();
prevNtpTime = getNtpTime();
gotime = rtp2ntp (p->getRtpTime() + api_pktSampleSize) + jitterTime;
//更新已经播放的数据包的计数器
RtpSeqNumber sSeq = prevSeqPlay;
prevSeqPlay++;
if (prevSeqPlay < sSeq)
{
playCycles += RTP_SEQ_MOD;
}

return p;
}

2> RTP数据流的发送:
  RTPSession::TransimitterRAW方法,RTP数据流的发送方法,发送没有接收这么复杂,不需要针对Buff中数据按照包序列号和NTP排序,最后再根据本地的包长度充足重组,只要写入Outbuff中直接加上RTP头就可以直接发送了。
int RtpTransmitter::transmitRaw (char* data, int len)
{
int len1;
//如果媒体设备所能接受的长度制式和网络传输的制式不能相符匹配,那么调用转换程序进行转//化,把本地播放制式长度转换成网络制式长度。ConvertCodec这个函数我们在前面已经有过//介绍。
if( apiFormat != networkFormat)
{
char* buffer = new char[1012];
len = convertCodec(apiFormat, networkFormat, data, buffer, len);
data = buffer;
}
// 把发送的字节发送到Outbuff中准备发送出去;
if( (outPos + len) < OUT_BUFFER_SIZE)
{
memcpy (outBuff + outPos, data, len);
outPos += len;
}
else
{
// circular memory copy
len1 = OUT_BUFFER_SIZE - outPos;
memcpy (outBuff + outPos, data, len1);
memcpy (outBuff, data + len1, len - len1);
outPos = len - len1;
}


// check if enough data to send out packet
int packetSize = networkFormat_payloadSize;
//发送新的RTP数据包
int result = 0;
//创建新的RTP数据包
RtpPacket* p = new RtpPacket (networkFormat_payloadSize);
assert (p);
//创建RTP包头
p->setSSRC (ssrc);
p->setPayloadType (networkFormat);
p->setPayloadUsage (packetSize);

//使用Outbuff中的数据,填充前面新创建的RTP包的内容,,每填充一个Packet包的指//针recPos向前移动一个Packet位置
while ( ((outPos + OUT_BUFFER_SIZE - recPos) % OUT_BUFFER_SIZE) >= packetSize )
{
if( (recPos + packetSize) < OUT_BUFFER_SIZE)
{ memcpy (p->getPayloadLoc(), outBuff + recPos, packetSize);
recPos += packetSize;
}
else
{ len1 = OUT_BUFFER_SIZE - recPos;
memcpy (p->getPayloadLoc(), outBuff + recPos, len1);
memcpy (p->getPayloadLoc() + len1, outBuff, packetSize - len1);
recPos = packetSize - len1;
}
//发送RTP包
result += transmit(p);
}
if( p) delete p;
p = NULL;
return result;
}

  3>上面说完了RTP包的发送和接收,现在该说说RTCP包的发送和接收了,我们知道RTCP包的目的在于向与参与会话者发送自量质量的反馈消息,实现多媒体同步的功能,不过问题在于RTCP包的数量随着参与者的数量增加而增加,所以一般说来点对点的话,没有必要使用RTCP控制,另外随着RSVP的普遍应用,Qos的控制机制愈加完善,也许没有必要用这么低级的Qos控制方式了。
我们可以看到在SoundCardDevice::ProcessRTP中调用了RTCP的发送和接收方法::

void RtpSession::processRTCP ()
{
if (rtcpTran)
{//这里的checkIntervalRTCP保证在固定间隔的时间内发送RTCP分组
if (checkIntervalRTCP()) transmitRTCP();
}
if (rtcpRecv)
{
receiveRTCP();
}
return ;
}
定期发送SR报告(发送者报告):
int RtpSession::transmitRTCP ()
{
… …
RtcpPacket* p = new RtcpPacket();

// load with report packet
rtcpTran->addSR(p);
//增加源描述项,在这里仅仅是发送方发送描述项,而接收方不发送
if (tran) rtcpTran->addSDES(p);
//调用UdpStack::Trasmitto发送RTCP分组,
int ret = rtcpTran->transmit(p);

if (p) delete p;
return ret;
}
如何构造一个发送者报告和接收者报告:
int RtcpTransmitter::addSR (RtcpPacket* p, int npadSize)
{
// 创建RTCP包的头部
RtcpHeader* header = reinterpret_cast < RtcpHeader* > (p->freeData());
int usage = p->allocData (sizeof(RtcpHeader));
//填充RTCP包头的各个项目/版本/填充位/长度记数/包类型(SR/RR)
header->version = RTP_VERSION;
header->padding = (npadSize > 0) ? 1 : 0;
header->count = 0;
header->type = (tran) ? rtcpTypeSR : rtcpTypeRR;
//获取当前时间戳
NtpTime nowNtp = getNtpTime();
//构造一个SR包的记录
if (tran)
{
RtcpSender* senderInfo = reinterpret_cast < RtcpSender* > (p->freeData());
usage += p->allocData (sizeof(RtcpSender));
int diffNtp = 0;
if (nowNtp > tran->seedNtpTime)
diffNtp = nowNtp - tran->seedNtpTime;
else
if (tran->seedNtpTime > nowNtp)
diffNtp = tran->seedNtpTime - nowNtp;
RtpTime diffRtp = (diffNtp * tran->networkFormat_clockRate) / 1000;
senderInfo->ssrc = htonl(tran->ssrc);//获得发送方的SSRC
senderInfo->ntpTimeSec = htonl(nowNtp.getSeconds());
senderInfo->ntpTimeFrac = htonl(nowNtp.getFractional());//获得NTP时间戳
senderInfo->rtpTime = htonl(tran->seedRtpTime + diffRtp);//获得RTP时间戳
senderInfo->packetCount = htonl(tran->packetSent);//发送的包记数
senderInfo->octetCount = htonl(tran->payloadSent);//发送的字节记数
}
… …

// report blocks
if ((rtcpRecv) && (rtcpRecv->getTranInfoCount() > 0))
{
RtpTranInfo* tranInfo = NULL;
RtpReceiver* recvInfoSpec = NULL;
RtcpReport* reportBlock = NULL;
for (int i = 0; i < rtcpRecv->getTranInfoCount(); i++)
{
tranInfo = rtcpRecv->getTranInfoList(i);
recvInfoSpec = tranInfo->recv;
… …
//cpLog (LOG_DEBUG_STACK, "RTCP: Report block for src %d",
// recvInfoSpec->ssrc);
reportBlock = reinterpret_cast < RtcpReport* > (p->freeData());
usage += p->allocData (sizeof(RtcpReport));

reportBlock->ssrc = htonl(recvInfoSpec->ssrc);
reportBlock->fracLost = calcLostFrac(tranInfo);
// 根据RFC 1889的A.3 计算包丢失率,根据接收到的包和周期内的期望接收值
//相比而得到。然后按照RTCP的头安置要求将丢包率摆好。
u_int32_t lost = (calcLostCount(tranInfo)) & 0xffffff;
reportBlock->cumLost[2] = lost & 0xff;
reportBlock->cumLost[1] = (lost & 0xff00) >> 8;
reportBlock->cumLost[0] = (lost & 0xff0000) >> 16;
//累计丢失分组率
reportBlock->recvCycles = htons(recvInfoSpec->recvCycles);
//扩展已接收的最高序号
reportBlock->lastSeqRecv = htons(recvInfoSpec->prevSeqRecv);
//到达的时延抖动
reportBlock->jitter = htonl(recvInfoSpec->jitter >> 4);
//最末的SR时间戳
reportBlock->lastSRTimeStamp = htonl(tranInfo->lastSRTimestamp);
//最末的SR到达后的时延
if (tranInfo->lastSRTimestamp == 0)
reportBlock->lastSRDelay = 0;
else
{
NtpTime thenNtp = tranInfo->recvLastSRTimestamp;
reportBlock->lastSRDelay = 0;
if (nowNtp > thenNtp)
reportBlock->lastSRDelay = htonl(nowNtp - thenNtp);
else
reportBlock->lastSRDelay = 0;
}
// next known transmitter
header->count++;
}
}

… …
assert (usage % 4 == 0);
//定义整个RTCP包的长度。
header->length = htons((usage / 4) - 1);

return usage;
}
如何构造一个源描述相:addSDES

定期接收RTCP包的程序:
int RtpSession::receiveRTCP ()
{
… …
//通过GetPacket的方法从Udp通道中读出RTCP分组,注:这个方法和RTP的读分组方法基本//一样
RtcpPacket* p = rtcpRecv->getPacket();
… …
if (rtcpRecv->readRTCP(p) == 1)
{
ret = 1;
}

if (p) delete p;
return ret;
}
我们下面来看一下,每一种RTCP包的处理过程:
int RtcpReceiver::readRTCP (RtcpPacket* p)
{
//begin和end均为RTCP队列接收的头尾。
char* begin = reinterpret_cast < char* > (p->getPacketData());
char* end = reinterpret_cast < char* > (begin + p->getTotalUsage());
RtcpHeader* middle = NULL;
int ret = 0;
//扫描整个队列处理RTCP分组
while (begin < end)
{
middle = reinterpret_cast < RtcpHeader* > (begin);
switch (middle->type)
{
case (rtcpTypeSR):
case (rtcpTypeRR):
readSR (middle);//处理SR分组
break;
case (rtcpTypeSDES):
readSDES (middle);//处理SDES分组
break;
case (rtcpTypeBYE):
if ( readBYE (middle) == 0)//处理Bye分组
{
ret = 1;
}
break;
case (rtcpTypeAPP):
readAPP (middle);//处理App分组
break;
default:
break;
}
begin += (ntohs(middle->length) + 1) * sizeof(u_int32_t);
}
return ret;
}
我们以处理SR/RR分组为例子,看一下如何处理RTCP分组消息的:
void RtcpReceiver::readSR (RtcpHeader* head)
{
char* middle = NULL;

NtpTime nowNtp = getNtpTime();
if (head->type == rtcpTypeSR)
{
RtcpSender* senderBlock = reinterpret_cast < RtcpSender* >
((char*)head + sizeof(RtcpHeader));
RtpTranInfo* s = findTranInfo(ntohl(senderBlock->ssrc));
s->lastSRTimestamp = (ntohl(senderBlock->ntpTimeSec) << 16 |
ntohl(senderBlock->ntpTimeFrac) >> 16);
s->recvLastSRTimestamp = nowNtp;
packetReceived++;//包接收记数增加一

NtpTime thenNtp ( ntohl(senderBlock->ntpTimeSec),
ntohl(senderBlock->ntpTimeFrac) );
//下面两个数值都可以被应用层直接调用,是应用层了解目前的RTP流的传输状况
accumOneWayDelay += (nowNtp - thenNtp);//在时间区段内RTP包抵达的总延迟
avgOneWayDelay = accumOneWayDelay / packetReceived;//平均延迟
middle = (char*)head + sizeof(RtcpHeader) + sizeof(RtcpSender);
}
else
{
middle = (char*)head + sizeof(RtcpHeader);

RtpSrc* sender = reinterpret_cast < RtpSrc* > (middle);
RtpSrc ssrc;

ssrc = ntohl(*sender);
middle += sizeof(RtpSrc);

packetReceived++;
}
RtcpReport* block = reinterpret_cast < RtcpReport* > (middle);
for (int i = head->count; i > 0; i--)
{
//下面两个数值都可以被应用层直接调用,是应用层了解目前的RTP流的接收状况
NtpTime thenNtp (ntohl(block->lastSRTimeStamp) >> 16,
ntohl(block->lastSRTimeStamp) << 16 );

NtpTime nowNtp1 (nowNtp.getSeconds() & 0x0000FFFF,
nowNtp.getFractional() & 0xFFFF0000);
accumRoundTripDelay += ((nowNtp1 - thenNtp)
- ntohl(block->lastSRDelay)); 在时间区段内RTP包接收的总延迟
avgRoundTripDelay = accumRoundTripDelay / packetReceived;// 在时间区段内RTP包接收// 的平均延迟
++block;
}
}

3.2.8.3 ACK消息的处理过程OpAck:

1.3 OpAck,这个操作主要是在被叫收到ACK消息后的处理过程,我们在这里先期做介绍。
const Sptr < State >
OpAck::process( const Sptr < SipProxyEvent > event )
{
... ...
if ( sipMsg->getType() != SIP_ACK )
{
return 0;
}

Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );
... ...
//接收到SDP以后从UaCallInfo中提取出对端的SDP,打开声音通道的RTP/RTCP,这个过程的处理机制可以参看//OpStartAudioDuplex::process
Sptr < SipSdp > remoteSdp = call->getRemoteSdp();
startAudio( localSdp, remoteSdp );
... ...
return 0;
}

3.2.8.4 OpConfTargetOk多方会议检测:

  OpConfTargetOk,表示多方会议时候的检测机制,这个机制在目前的设定中没有使用,所以没有必要介绍

  OpFwdDigit,在打开RTP/RTCP媒体通道以后,如果这个时候定义了通话转接呼叫的方式,那么按下0-9的按纽,那么该方法通过以下的流程:

UaDevice::getDeviceQueue()->add( signal )-->ResGwDevice::processSessionMsg-->
CaseHardwareSignalType:...-->provideSignal-->provideDtmf-->OpAddDigit ::process--> UaDevice::instance()->getDigitCollector()->addDigit( digit )

  将所输入的号码存储在DigitCollector中,如果通话继续呼叫方式有效,那么在操作队列中增加addOperator( new OpSecondCall ),在这个新增加的操作符中重新开始向新的一端发送Invite消息(根据输入的Digit形成被叫的Url)从而实现呼叫从一端转接到另外一端的方式。

3.2.9呼叫等待:

  呼叫等待是SIP电话系统中一个比较有用的应用,在 RFC2543对这个应用也做了一些描述,主要的方法是向在通话过程中向等待方发送一个INVITE消息,消息中包括了一个将本地的SDP的C=选项的地址改变成"0.0.0.0"同时为了和上一个INVITE消息区分Cseq项增加1,通过这样实现抑制本地的媒体流。

我们看一下流程:


(点击放大)

3.2.9.1 呼叫等待的详细描述:(以Diagram.17为例)

a. A,B两个端点通过RTP/RTCP进行语音通讯;
b. B接收到了C的一个呼叫(Invite消息),这个时候B处于OpRing的状态中,B向C发送Ring表示已经收到C的呼叫,并且让C处于等待B摘机的状态;
c.这个时候B进入OpStartCallWaitting状态,在这个状态里,捕捉终端接收的DeviceEventFlash信号,也就是Flash信号,这样把当前的A,B RTP会话陷入Hold状态,也就是保持状态,B把当前的会话的ID号放置入CallWaitingID的队列中去进行等待;
d. B在OpStartCallWaiting中向A发送Reinvite消息,这个INVITE消息的SDP的C=选项的地址改变成"0.0.0.0",这个时候A在OpReinvite状态中, B的通话暂时陷入停止,进入StateOnHold状态中
c. B和C开始进行通讯;
d. C挂机发送Bye消息给B这个时候B进入OpEndCall状态;
e. B在这个状态的时候检测到在呼叫等待,B进入到OpConvertCW中,并且把等待队列中的CallID带入myevent队列中准备执行;如果这个时候捕捉到终端接收的DeviceEventFlash信号,OpRrevert操作向A发送Reinvite消息,恢复和A的通讯;
f.A,B之间开始通讯;

3.2.9.2操作之间存在的竞争:

  从上面来看在操作中存在这一定的竞争,A,B之间通讯进入终止以后,是进入的StateOnHold状态,同样在B,C之间的通讯,在在StateInCall状态的时候,用户也有可能发出DeviceEventFlash消息,迫使B重入StateOnHold状态,而不是在对方发出Bye消息以后,这样的结果就是B在StateOnHold状态无法返回,修改的方法其实非常简单,只要这样就可以了:

  addOperator( new OpRevert )改成
  addEntryOperator(new OpRevert);
  为什这么改呢?把OpRevert放在不同的队列中,这样,从StateInCall状态转入StateOnHold的时候,就不是只有一个FlashEvent的条件提供判断了,状态的变化需要通过State:::Process来执行,这样就增加了一个约束的条件,大家不明白的话可以细看一下State::Process(…)的代码。

3.2.9.3 呼叫中所涉及模块介绍:

  以下对呼叫等待所涉及到的一些模块和方法的简单介绍:
a.OpStartCallWaiting的应用:
OpStartCallWaitting主要是检验是否有进入呼叫等待DeviceEventFlash信号,并且把当前的对话切换到等待状态,而当前的等待切换为当前对话,并且向等待的一方发送Re-Invite的hold消息。
OpStartCallWaiting::process( const Sptr < SipProxyEvent > event )
{
//如果这个时刻,出现C端呼叫B端的情况,如果B端要转移呼叫到C那么按下"f"代表呼叫转//移。
if ( deviceEvent->type != DeviceEventFlash )
{
return 0;
}
//注意这个时候C呼叫的CallID已经被装入callwaitinglist中准备调用;
Sptr < SipCallId > call2Id = UaDevice::instance()->getCallWaitingId();
if ( call2Id == 0 )
{
// no call on call waiting
return 0;
}


if ( UaConfiguration::instance()->getCallWaitingOn() )
{
//通知当前的等待队列中的消息准备准备开始和B进行通讯,主要方式是把它的Call ID挂入//myeventQ中,由SipThread处理在队列里的消息。
Sptr < UaCallContainer > calls;
calls.dynamicCast( event->getCallContainer() );
assert( calls != 0 );
Sptr < UaCallInfo > call2 = calls->findCall( *call2Id );

Sptr < Fifo < Sptr < SipProxyEvent > > > eventQ = deviceEvent->getFifo();
Sptr < UaDeviceEvent > event = new UaDeviceEvent( eventQ );
event->type = DeviceEventFlash;
event->callId = call2Id;
eventQ->add( event );

// 把当前的A-B通讯装入等待队列中等待;
Sptr < SipCallId > callId = UaDevice::instance()->getCallId();
UaDevice::instance()->setCallId( 0 );
UaDevice::instance()->addCallWaitingId( callId );
}
//准备回送给A端发送SDP的"C"为0.0.0.0的Invite消息,并且在Cseq为上一个的累加;
Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );

// Put current contact on hold
Sptr < InviteMsg > reInvite;

Sptr < Contact > contact = call->getContact();
assert( contact != 0 );

int status = contact->getStatus();
if ( status == 200 )
{
//在作为呼叫方的时候,这个Invite消息非常好制作,大部分只要复制上一个的一些内//容就可以了。
const StatusMsg& msg = contact->getStatusMsg();
if ( &msg != 0 )
{
reInvite = new InviteMsg( msg );

//add SDP
Sptr < SipSdp > localSdp = call->getLocalSdp();
assert( localSdp != 0 );
SipSdp sipSdp = *localSdp;
reInvite->setContentData( &sipSdp );
}
… …
}
else
{
//在作为被叫方的时候,这个Invite消息就比较麻烦了,基本上要重新创立。

const InviteMsg& msg = contact->getInviteMsg();
if ( &msg != 0 )
{
string sipPort = UaConfiguration::instance()->getLocalSipPort();
reInvite = new InviteMsg( msg.getFrom().getUrl(),
atoi( sipPort.c_str() ) );
SipFrom from( msg.getTo().getUrl() );
reInvite->setFrom( from );

reInvite->setCallId( msg.getCallId() );

// Convert RecordRoute to reverse Route
int numRecordRoute = msg.getNumRecordRoute();
SipRecordRoute recordroute;
SipRoute route;

for ( int i = 0; i < numRecordRoute; i++ )
{
recordroute = msg.getRecordRoute( i );
route.setUrl( recordroute.getUrl() );
reInvite->setRoute( route ); // to beginning
}

int numContact = msg.getNumContact();
if ( numContact )
{
SipContact contact = msg.getContact( numContact - 1 );
route.setUrl( contact.getUrl() );
reInvite->setRoute( route ); // to beginning
}

}

}
assert( reInvite != 0 );

SipVia sipVia;
sipVia.setprotoVersion( "2.0" );
sipVia.setHost( Data( theSystem.gethostAddress() ) );
sipVia.setPort( atoi( UaConfiguration::instance()->getLocalSipPort().c_str() ) );
reInvite->flushViaList();
reInvite->setVia( sipVia, 0 );

// Set Contact: header
Sptr< SipUrl > myUrl = new SipUrl;
myUrl->setUserValue( UaConfiguration::instance()->getUserName(), "phone" );
myUrl->setHost( Data( theSystem.gethostAddress() ) );
myUrl->setPort( atoi( UaConfiguration::instance()->getLocalSipPort().c_str() ) );
SipContact me;
me.setUrl( myUrl );
reInvite->setNumContact( 0 ); // Clear
reInvite->setContact( me );

//TODO Is it going to be a problem if the other side also use the next
//TODO CSeq at the same time?
unsigned int cseq = contact->getCSeqNum();
contact->setCSeqNum( ++cseq );
SipCSeq sipCSeq = reInvite->getCSeq();
sipCSeq.setCSeq( cseq );
reInvite->setCSeq( sipCSeq );

Sptr sipSdp;
sipSdp.dynamicCast ( reInvite->getContentData( 0 ) );
assert ( sipSdp != 0 );
SdpSession sdpDesc = sipSdp->getSdpDescriptor();
//在这里把SDP的C=0.0.0.0(hold项)设定;
sdpDesc.setHold();
sipSdp->setSdpDescriptor( sdpDesc );
//发送Reinvite消息到A端。
deviceEvent->getSipStack()->sendAsync( *reInvite );
Sptr < UaStateMachine > stateMachine;
stateMachine.dynamicCast( event->getCallInfo()->getFeature() );
assert( stateMachine != 0 );
//转程序到StateOnhold
return stateMachine->findState( "StateOnhold" );
}
b.OpReinvite:
OpReinvite在接收到由通讯的对端Invite消息以后,把消息内的 RemoteSDP放在本地的UaCallInfo中然后回送一个OK消息给对端;
c.OpEndCall:
OpEndCall在检测到Bye消息发送以后,让程序回送OK消息并且进入StateCallEnded状态中;
d.OpRevert:
OpRevert检测到再次有DeviceEventFlash消息的时候本地开始发送INVITE消息,把等待方`处于等待的呼叫唤起;
e.StateCallEnded状态:
StateCallEnded同样是检测DeviceEventFlash的消息,检测到以后把调用OpConvertCw的操作,把处于等待队列里的呼叫唤起。
 

在Vovida的基础上实现自己的SIP协议栈(六)

卢政 2003/08/08
3.3 等待对方的呼叫:

  上面花了那么长的时间叙述了如何发起一个呼叫,我们再来介绍一下如何接收一个呼叫:

  当用户进入Idle状态以后,如果系统接收到一个INVITE消息,系统将进入Ring状态,并且进入Opring操作中,这个时候硬件设备将播放振铃声,这个时候如果用户决定摘机通话,那么offhook事件就会产生,同时OpAnswerCall将使状态机进入InCall状态,向主叫发送200响应消息,同样RTP/RTCP通道打开,开始通话,如果通话完毕,双方挂机,那么互相发送SIP Bye消息,OpEndCall将使系统重新回到Idle状态。

  下图表示了从接收到INVITE消息通话完毕的各个状态之间程序各种类之间的迁移过程,和主动呼叫的情况一样,如果协议栈软件应用于Marshal或者是Redirection Server的话,那么采用的协议流程和下面又有一些不一样了,在后续章节会对这些做详细介绍。

  在本图中粗体的部分表示加入MyEntryOperator队列中的操作符
正体的部分表示加入MyOperator队列中的操作符
斜体的部分表示加入MyExitOperator队列中的操作符


(点击放大)


下面我们来详细地介绍每个操作:

3.3.1 OpRing等待对方的振铃消息

OpRing:获取对端向本地发送的INVITE消息
const Sptr < State >
OpRing::process( const Sptr < SipProxyEvent > event )
{
Sptr < SipEvent > sipEvent;
sipEvent.dynamicCast( event );
if ( sipEvent == 0 )
{
return 0;
}
Sptr < SipMsg > sipMsg = sipEvent->getSipMsg();
assert( sipMsg != 0 );
//接收INVITE消息;
Sptr < InviteMsg > msg;
msg.dynamicCast( sipMsg );
… …

Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );
//在UaCallInfo中存储当前的INVITE消息;
call->setRingInvite( new InviteMsg( *msg ) );
call->setContactMsg(*msg);
//保存当前的路由消息;
call->setCalleeRoute1List( msg->getrecordrouteList() );
int numContact = msg->getNumContact();
if ( numContact )
{//保存连接
SipContact contact = msg->getContact( numContact - 1 );
Sptr < SipRoute > route = new SipRoute;
route->setUrl( contact.getUrl() );
call->addRoute1( route );
}
… …
Sptr< BaseUrl > baseUrl = msg->getFrom().getUrl();
assert( baseUrl != 0 );
// Assume we have a SIP_URL
Sptr< SipUrl > sipUrl;
sipUrl.dynamicCast( baseUrl );
assert( sipUrl != 0 );
//获取主叫的Sip URL
Data callingNum = sipUrl->getUserValue();
callingNum += "@";
callingNum += sipUrl->getHost();
signal->dataList.push_back( callingNum.getData(lo) );
//把主叫和被叫的地址(URL)都装入设备的信号队列中,为媒体流和铃声回放的RTP信道做准备
SipRequestLine reqLine = msg->getRequestLine();
baseUrl = reqLine.getUrl();
assert( baseUrl != 0 );

sipUrl.dynamicCast( baseUrl );
assert( sipUrl != 0 );
string calledNum = sipUrl->getUserValue().getData(lo);
signal->dataList.push_back( calledNum );
UaDevice::getDeviceQueue()->add( signal );
//获取主叫的SDP
Sptr remoteSdp;
remoteSdp.dynamicCast (msg->getContentData(0));
bool ringbackTone = false;
//创建本地的SDP
SipSdp localSdp;
if ( remoteSdp != 0 )
{
localSdp = *remoteSdp;
Data host = theSystem.gethostAddress();
if(UaConfiguration::instance()->getNATAddress() != "")
{
host = UaConfiguration::instance()->getNATAddress();
}
//设定本地的SDP
setStandardSdp(localSdp, host,UaDevice::instance()->getRtpPort());
}
//获取本地状态是否当前的硬件状态支持Call Waiting
HardwareStatusType hdwStatus = UaDevice::instance()->getHardwareStatus();
//检验本地是否支持零声回放(回放的话要在零声消息里增加本地的SDP)
StatusMsg statusMsg;
if (UaConfiguration::instance()->getProvideRingback() &&
hdwStatus == HARDWARE_AVAILABLE &&
(remoteSdp != 0) )
{//提供零声回放回送183状态。
ringbackTone = true;
StatusMsg status( *msg, 183 );
status.setContentData( &localSdp );
call->setLocalSdp( new SipSdp( localSdp ) );
statusMsg = status;
}
else
{
// 提供零声回放则回送180状态
StatusMsg status( *msg, 180 );
statusMsg = status;
}
if ( remoteSdp != 0 && UaConfiguration::instance()->getProvideRingback() )
{ call->setRemoteSdp( new SipSdp( *remoteSdp ) );
call->setLocalSdp( new SipSdp( localSdp ) );
Sptr < SipSdp > localSdp = call->getLocalSdp();
Sptr < SipSdp > remoteSdp = call->getRemoteSdp();
//这里要建立RSVP会话,准备开始预留路径。
setupRsvp(*localSdp, *remoteSdp);
}

// TODO Call log Show caller information

Sptr < SipCallId > callId
= new SipCallId( sipEvent->getSipCallLeg()->getCallId() );
if ( hdwStatus == HARDWARE_AVAILABLE )
{
// 处理当前的队列
UaDevice::instance()->setCallId( callId );
}
else if ( hdwStatus == HARDWARE_CALLWAITING_ALLOWED )
{
//把当前的呼叫放在等待队列中
UaDevice::instance()->addCallWaitingId( callId );
}
else
{
return 0;
}

sipEvent->getSipStack()->sendReply( statusMsg );
if ( ringbackTone )
{//回放零声(如果需要的话)。
sendRemoteRingback(*remoteSdp);
}

Sptr < UaStateMachine > stateMachine;
stateMachine.dynamicCast( event->getCallInfo()->getFeature() );
assert( stateMachine != 0 );
//进入StateRinging状态
return stateMachine->findState( "StateRinging" );
}

3.3.2 OpStartRinging开始响铃

  OpStartRinging开始振铃,这个程序和简单,主要是在设备处理队列中加入振铃消息,并且由设备对振铃消息进行处理。

3.3.3 OpRingingInvite处理又一个INVITE消息(呼叫等待)

  OpRingingInvite是一个比较有趣的状态,如果在StateRinging期间有新的INVITE消息过来,那么就回送一个180消息给它,让它处于一个呼叫等待的状态。

3.3.4 OpAnswerCall被叫打开媒体通道开始通讯

OpAnswerCall const Sptr < State >
  该操作的主要目的在于本地接收到主叫发送过来的Invite消息以后,根据主叫的SDP信息,创建本地的SDP并回送200消息,等待主叫发送ACK消息,正式打开媒体通道。

  在SDP中最重要的项目莫过于"m="媒体流指示和"a="会话描述,在其之中定义了载荷类型,RTP/RTCP端口,编码方式这几个重要参数。

MediaList:代表的是媒体信息指示也就是所有"m="项目的描述列表
MediaAttrib:代表的是会话描述的内容也就是所有"a="项目的描述内容
我们以下列的SDP为例子:
SDP Headers
-----------------------------------------------------------------
Header: v=0
Header: o=CiscoSystemsSIP-IPPhone-UserAgent 13045 2886 IN IP4 192.168.6.20
Header: s=SIP Call
Header: c=IN IP4 192.168.6.20
Header: t=0 0
Header: m=audio 30658 RTP/AVP 0 101
Header: a=rtpmap:0 pcmu/8000
Header: a=rtpmap:101 telephone-event/8000
Header: a=fmtp:101 0-11
Header: m=video 30700 RTP/AVP 102
Header: a=rtpmap:31 H261/9000

OpAnswerCall::process( const Sptr < SipProxyEvent > event )
{
Sptr < UaDeviceEvent > deviceEvent;
deviceEvent.dynamicCast( event );
if ( deviceEvent == 0 )
{
return 0;
}
//检测是否摘机
if ( deviceEvent->type != DeviceEventHookUp &&
deviceEvent->type != DeviceEventFlash )
{
return 0;
}

Sptr < UaCallInfo > call;
call.dynamicCast( event->getCallInfo() );
assert( call != 0 );
//取得引发振铃的INVITE消息。
Sptr < InviteMsg > msg = call->getRingInvite();
assert( msg != 0 );
//取出当前的INVITE消息的CallID
SipCallId callId = msg->getCallId();
//如果当前的CllID不是当前正在处理的CallID那么让当前的Call处于等待当中。
if ( UaDevice::instance()->isMyHardware( callId ) == false )
{
Sptr < SipCallId > callWaitingId =
//检验INVITE的CallID是否处于等待队列中
UaDevice::instance()->getCallWaitingId();
if ( callWaitingId == 0 )
{
return 0;
}

if ( *callWaitingId != callId )
{
return 0;
}
//如果两次发送Invite消息是相同的CallID那么第二个肯定是Re-Invite消息,取消当前
//的Call ID,因为有可能在前面介绍的呼叫等待当中,有可能在发送新的INVITE消息的时//候(OpRingingInvite接收到)让程序陷入当前OpRing-->OpAnswerCall的状态,把当前的//这样在当前的这个操作中把处于等待的Call ID消除。
UaDevice::instance()->setCallId( callWaitingId );
UaDevice::instance()->removeCallWaitingId( *callWaitingId );
}

// 取得远端的SDP
Sptr remoteSdp;
remoteSdp.dynamicCast ( msg->getContentData(0) );

call->setRemoteSdp( new SipSdp( *remoteSdp ) );
StatusMsg status( *msg, 200/*OK*/ );

// 根据Cfg文件配置本地的Url到SDP中。
Sptr< SipUrl > myUrl = new SipUrl;
myUrl->setUserValue( UaConfiguration::instance()->getUserName() );
myUrl->setHost( Data( theSystem.gethostAddress() ) );
myUrl->setPort( atoi( UaConfiguration::instance()->getLocalSipPort().c_str() ) );
if(UaConfiguration::instance()->getSipTransport() == "TCP")
{
myUrl->setTransportParam( Data("tcp"));
}

SipContact me;
me.setUrl( myUrl );
status.setNumContact( 0 ); // Clear
status.setContact( me );
//根据远端回传的SDP创建本地的SDP
Sptr localSdp;
localSdp.dynamicCast ( status.getContentData(0) );
//本地的SDP设置不为空的情况
if ( localSdp != 0 )
{
//设定本地的RTP端口号
localSdp->setRtpPort( UaDevice::instance()->getRtpPort() );
// 设定RTP包的传输速率
int rtpPacketSize = UaConfiguration::instance()->getNetworkRtpRate();
//取得SDP描述符(会话符SdpSession)
SdpSession sdpDesc = localSdp->getSdpDescriptor();
list < SdpMedia* > mediaList;
//取得媒体描述列表例如:所有的以"m="打头的描述
mediaList = sdpDesc.getMediaList();
list < SdpMedia* > ::iterator mediaIterator = mediaList.begin();
//取得所有的媒体列表所有的以"m="打头的媒体描述
vector < Data > * formatList = (*mediaIterator)->getStringFormatList();
if ( formatList != 0 )
{
formatList->clear();
}
//采用缺省的方式来建立媒体名和传送地址,这里当主叫和被叫没有公共的媒体格式的时候,//被叫返回媒体流的"m"行,设置端口为0并且不返回载荷类型。
(*mediaIterator)->addFormat( 0 );
// MediaAttributes表示所有"a="和"a=rtpmap:…"的集合
MediaAttributes* mediaAttrib
//取得媒体流的会话属性列表:(所有的"a="的列表)
//a=rtpmap : <载荷类型> <算法名称> / <时钟采样频率> [/<带入参数>]
mediaAttrib = (*mediaIterator)->getMediaAttributes();
if ( mediaAttrib != 0 )
{ //取得一个单纯的a=<属性>:<值>(不包括"a=rtpmap")例如:a=recvonly
vector < ValueAttribute* > * valueAttribList = mediaAttrib->getValueAttributes();
vector < ValueAttribute* > ::iterator attribIterator = valueAttribList->begin();
while ( attribIterator != valueAttribList->end() )
{
char* attribName = (*attribIterator)->getAttribute(); //如果遇见a=ptime的情况,就表示要设置时长,那么也就是要设定RTP的帧长
if ( strcmp( attribName, "ptime" ) == 0 )
{
rtpPacketSize = Data((*attribIterator)->getValue()).convertInt();
break;
}
attribIterator++;
}
mediaAttrib->flushValueAttributes();
mediaAttrib->flushrtpmap();
//以上是根据原段对段远端的SDP来获得相关的会话属性值,下面是如何将这些会话属性值//加入当前的本地的 SDP的会话属性列表"a="当中
//设定本地的简单的"a=<属性>:<值> "
ValueAttribute* attrib = new ValueAttribute();
attrib->setAttribute( "ptime" );
LocalScopeAllocator lo;
attrib->setValue( Data( rtpPacketSize ).getData(lo) );

//增加a=rtpmap : <载荷类型> <算法名称> / <时钟采样频率> [/<带入参数>]
//在本地的a=rtpmap:当中

SdpRtpMapAttribute* rtpMapAttrib = new SdpRtpMapAttribute();
rtpMapAttrib->setPayloadType( 0 );
rtpMapAttrib->setEncodingName( "PCMU" );
rtpMapAttrib->setClockRate( 8000 );

mediaAttrib->addValueAttribute( attrib );
mediaAttrib->addmap( rtpMapAttrib );
}
else//如果mediaAttrib为0的情况,也就是"a="项目为空的情况
{
cpLog(LOG_DEBUG, "no mediaAttrib");
mediaAttrib = new MediaAttributes();
assert(mediaAttrib);
(*mediaIterator)->setMediaAttributes(mediaAttrib);

// create the new value attribute object
ValueAttribute* attrib = new ValueAttribute();
// set the attribute and its value
attrib->setAttribute("ptime");
LocalScopeAllocator lo;
//通过Cfg文件获取RTP传输速率,创建一个a=ptime:<分组时间>的对话属性。 attrib->setValue( Data( UaConfiguration::instance()->getNetworkRtpRate() ).getData(lo) );

//add the rtpmap attribute for the default codec
SdpRtpMapAttribute* rtpMapAttrib = new SdpRtpMapAttribute();
rtpMapAttrib->setPayloadType(0);
rtpMapAttrib->setEncodingName("PCMU");
rtpMapAttrib->setClockRate(8000);

// 增加新创建的会话属性到本地的SDP当中
mediaAttrib->addValueAttribute(attrib);
mediaAttrib->addmap(rtpMapAttrib);
}
localSdp->setSdpDescriptor(sdpDesc);
//回送OK给主叫端,并且通告本地的SDP
call->setLocalSdp( new SipSdp( *localSdp ) );
deviceEvent->getSipStack()->sendReply( status );
}
else // 根据远端创建的本地的SDP为0的情况。
{
cpLog(LOG_DEBUG, "localSdp == 0");
// May not have SDP in original INVITE for 3rd party call control
SipSdp sdp;

Data hostAddr = theSystem.gethostAddress();

if(UaConfiguration::instance()->getNATAddress() != "")
{
hostAddr = UaConfiguration::instance()->getNATAddress();
}

int rtpPort = UaDevice::instance()->getRtpPort();
//重新构造一个本地的SDP发送给主叫端,该媒体属性和主叫方的处于一致。
doAnswerStuff(sdp, remoteSdp, hostAddr, rtpPort);
//在状态中回送本地的SDP
status.setContentData( &sdp, 0 );
call->setLocalSdp( new SipSdp( sdp ) );
deviceEvent->getSipStack()->sendReply( status );
}
//转移工作状态到StateInCall
Sptr < UaStateMachine > stateMachine;
stateMachine.dynamicCast( event->getCallInfo()->getFeature() );
assert( stateMachine != 0 );

return stateMachine->findState( "StateInCall" );
}

3.3.5 回到StateInCall状态

  最后程序被叫和主叫所进入的状态都回到StateInCall状态,在这个状态里,被叫接收主叫发送的ACK消息,如果在其中含有新的SDP的话,就按照新的SDP进行处理,否则,不就按照UaCallInfo中所包含的SDP的进行处理。

4. 如何在改造现有的终端使之能传递视频流。

  目前的Vocal平台是仅仅支持语音传输的,还没有考虑到视频部分,不过从整个协议栈的构造来说,我尝试过把当前的SIP修改成一个语音/视频一体的完整的视频电话系统(如果想修改成会议系统的话,我们稍后在Conference Server这节里面做介绍)只需要把当前的协议栈部分增加大概1500-2000行左右的代码(但不包括Codec部分)就可以完成,我们以增加一个H.261+能力作为我们对这个问题的讨论点:

4.1一个H.261+的Codec的基本构造:

  有参加过视频压缩和解压缩算法开发工作的同志应该知道,一般来说一个视频通讯的基本类包括一个如下的流程:

  以Openh323中实现的方式为例:(只叙述H.261+编码部分)


(点击放大)


4.2 增加视频能力所需要做的工作

1.设备驱动/压缩/解压缩部分:
a. 创建一个H.261的编码实例,构造一个P64的同步压缩器,您可以从加洲大学或者是在OpenH323组织的网站上下载该算法的原代码。
b. 在输入上绑定一个标准的摄相头部分,您可以按照一个标准输入设备来做,也可以使用PWLIB来作为这个设备的输入通道,进行绑定,不过,如果使用PWLIB的时候整体效率会降低很多。
c. 最后把编码类和摄相头联合构造一个标准的输出/输入类(类似于SoundCard的实例一样),如果这个标准类的调用接口方法可以构造成和标准的声音设备一样,也可以构造成在声音设备之内。

2.协议部分的改造:
  对于SIP和H.323相比较而言,两者在视频通讯的概念上有很大的不同,SIP把所有的媒体讯息理解成相同的RTP流,区别只是带宽不相同;而H.323则有一种专门的快速方式把两者区分开,首先打开语音通道传送音频,然后打开视频通道传送视频信息,两者用不同的媒体通道传输。后者的最大好处在于,如果带宽不足的话,至少可以传递语音信息到对方。当然我们可以在SIP的协议前提下做适当的修改模仿H.323的运行方式。

a. 媒体流的描述:
一个媒体信息的具体内容包括:
一个H.261的视频流描述例子:
m=video 513000 RTP/UDP 31
a=rtpmap:31 h261/90000

  媒体类型:Video表示媒体类型,513000表示的是RTP端口号,这个端口号从管理上来说最好和音频不相同。

  传送协议:采用RTP/UDP上传送,因为目前SIP Stack只支持UDP还不支持AVP的格式;
  媒体格式:在RTP中定义的静态载荷类型文档号为31;
  媒体地址和端口:目的地址和RTP的端口号。
  这里最主要改造的地方是OpInviteUrl方法,和用户端回送OK消息的OpAnswerCall,这里是创建初始的视频描述的操作,在他们中间需要增加对视频流的描述,另外在被叫对主叫送来的INVITE消息中必须要检测对方的媒体类型是否能解析,如果不行的话还要在OK消息中做相应的拒绝返回。
例如:

主叫INVITE消息的SDP:
Header: v=0
Header: o=- 1528076688 1528076688 IN IP4 192.168.66.1
Header: s=VOVIDA Session
Header: c=IN IP4 192.168.66.1
Header: t=3177769010 0
Header: m=audio 56104 RTP/UDP 0
Header: a=rtpmap:0 PCMU/8000
Header: a=ptime:20
Header: m=video 56110 RTP/UDP 31
Header: a=rtpmap:0 H261/90000
如果被叫不愿意或者无能力接受视频流那么被叫回送的SDP如下:
Header: v=0
Header: o=- 1528076688 1528076688 IN IP4 192.168.66.2
Header: s=VOVIDA Session
Header: c=IN IP4 192.168.66.2
Header: t=3177769010 0
Header: m=audio 56114 RTP/AVP 0
Header: a=rtpmap:0 PCMU/8000
Header: a=ptime:20
Header: m=video 0 RTP/UDP 31

b. RTP/RTCP部分的改造:
  首先在现有的音频RTP/RTCP会话基础上增加一个视频的RTP/RTCP会话,从前面介绍的我们知道视频和音频一般来说是不在一个RTP端口的,那么我们为了新开的视频RTP端口当然需要捆绑一个RTPSession(会话),在这里我们需要重写视频设备实例的ProcessRTP方法,换一句话来说,就是视频和音频设备有自己的ProcessRTP方法,分别从不同的端口进程读取相应的媒体数据流,另外相对视频信号而言,视频流的RTP帧肯定相对要大一些,不过为了保证声音/视频同步,一般来说两者的时长还是需要相等(一般是以20ms的数据帧,在这个时长内声音/视频的RTP的头和内容的比例还是比较均匀的,不过这样的话,要使用Jitter的方式,但是实际上,视频/音频的时间上并不能做到完全的相等,所以在声/象同步上并不能完全依赖SSRC,需要在设计时采用同步缓冲的方式,大家有兴趣的话可以参加一些RSVP工程组中有关于QoS改善的讨论)。

  这样如果分别有自己的ProcessRTP方法,那么前面的说道的在视频流中如果发生需要保证音频带宽而需要放弃视频带宽的情况,这样的情况我们就很好处理了,在SIP协议中并不保证主叫可以单独打开一个音频通道,那么我们只能在RTP/UDP这一层来完成,可以利用RTCP中的APP分组来完成这个通告,比如主/被一端想终止视频或者音频通讯,在RTCP通道中发送一个自定义的APP分组就可以了。

  但是这样在初始化阶段,如果RSVP在路径上没有预留足够的资源,那么在开始的视频和音频通讯就可能会造成阻塞,可能造成用户的媒体通讯超时,而不能向H.323那样可以先通过快速方式打开一个音频通道(0号通道),而后再开启视频通道,所以上述的方法对于会话初始阶段毫无用处。

c. 声象同步:
  任何一个商业成功的商业运作的视频通讯软件中都需要比较好的去解决声象同步这
个重要的问题,我们一般建议采用的方式是建立FIFO队列缓冲,根据RTP包的SSRC重新排列这些分组,不过如果在操作系统中采用了Direct X或者是直接读写FrameBuffer技术的话,那么FIFO也就不能达到您想直接提高图象处理速度的能力.

3.QoS的提高:
  最后说一下Qos的提高工作,视频通讯的处理不但消耗大量的系统资源,也要消耗大量的网络资源,当然按照前面所说的简单的对QoS进行处理当然是不行的,需要采取的策略有以下两种:

a. RSVP上的改善:视频通讯预留的带宽要比原来音频的要大很多了,而且我们必须考虑到H.261/263协议中的帧间帧(IntraFrame)的情况,会产生突发性的带宽需求,所以预留带宽需要比平均带宽高30%左右,另外,对RESC/PATH消息对必须在通讯中周期性的传送,确保证实带宽(必须考虑消息对所占用的一定带宽)。
b. RTP信道上的改善:我们通过RTCP中的SR/RR分组了解到了丢失率,累计分组数,到达时延抖动等等QoS信息,我们可以通过这些信息对通讯终端的视频信号进行一定的控制,比如在发现QoS降低时,可以以降低量化度或单位时间帧数来降低带宽负载。

参考文献:
RFC3261
RFC2548
RFC2205
RFC2212
RAPI -- An RSVP Application Programming Interface Version 5
Practical VOIP Using Vocal(O'REILLY)
IP Phone Based on IP network and Multimedia communication(WOS)
IP Phone in Internent(Oliver David)
IP网络电话技术(人民邮电出版社)
视频压缩与视频编码技术(中国电力出版社)

(完)

作者联系方法:lu_zheng@21cn.com
  Vovida.org -- Your Source for Open Source Communication
http://www.vovida.org/
 。SIP SOFTPHONE软件电话源代码
http://www.vovida.org/downloads/sipset/sipset-0.8-16.i386.rpm

。SIP SOFTPHONE源代码说明
http://www.vovida.org/downloads/sipset/README-sipset-0.8.txt

。VOVIDA的SIP源代码
http://www.vovida.org/downloads/sip/

。VOVIDA SIP源代码说明
http://www.vovida.org/downloads/sip/README-SIP-1.4.0.txt