当前位置:首页 >> 信息与通信 >>

多线程框架下开发


框架下开发
基于新框架的开发
目录结构
框架 文件(输入,输入,事物控制,状态)

billing40/frame/svr/xdrframe/frame_interface.h billing40/app/framefile/frame_file_interface.h billing40 在 products/openboss/

billing40 用: cvs_co products/openboss/billing40 cd products/openboss/billing40 cvs_up –r ob20_dev

开发
如批价: class CRatingThread : public CThread { Public: Virtual int32 init(); Virtual int32 destroy(); Virtual int32 run(CData *p); virtual int32 exception(CData *p); virtual int32 control(DOMNode & resultDoc, const DOMNode & controlDoc); }; CThread *create_instanceRating() { Return new CRatingThread(); } ? ? 所有在框架下运行的 lib 都有一个 class,这个 class 直接或间接继承 CThread 在这个 class 里,须实现虚拟函数 run。如:对批价来说 run 的参数 p 就是一个 xdr 的指

? ? ?

? ? ?

? ? ? ?

?

针 Lib 里还须提供一个函数,创建这个 class 的对象,如上面 create_instance,框架为每个 线程创建一个 class 的对象。 每个线程一个 CRatingThread 对象,在 CRatingThread 里的数据都是线程安全的 处理完成的数据,调用 send_data(void *p, const int32& iIndex),把数据送到下一个节点 (模块) 。参数 iIndex,是在当前节点的处理结果要分开输出到不同节点时用。如正确 话单 iIndex=0, 错单 iIndex=1。 Init 函数。线程初始化。如读配置文件,连接到 MDB server 等。 时钟,CTime *g_pTime,每 100 微秒更新一次。CTime 里有多格式的时间,可以直接 使用,如批价使用的 process_time。CTime 的声明见下面 配置文件采用 xml,而且原来有一些配置到数据库表里的配置也移到了配置文件,框架 和数据库没有关系了,不再提供 otl_connect,也没有了数据库帐号,读取配置项统一 用 CThread 里的 read_config 。框架已经对配置文件进行了初始化,各模块不需要再对 配置文件做初始化,直接读取即可。框架提供了一些读取配置项的宏,可以方便地读取 到配置项。如业务分析的配置,见下面举例。 多线程框架在名字空间 bs_frame 里 包含头文件 frame_interface.h,编译时使用-lFrameD Exception 函数是异常时的处理,如批价,在异常时,把话单打成错单。 写日志统一采用 CThread::LogAppend,框架可以会把一些日志发到前台。同时,通过 这个函数写日志时, 框架会自动的写日志的线程所在的流水线和节点的信息加上。 这也 带来了一个麻烦,写日志时必需有一个 CThread 对象(业务处理模块都有一个类(A) 从 CThread 派生, 在这个类里可以直接调用 LogAppend, 在其它不是从 CThread 类派生 的类里,如果需要写日志,需要把 A 的指针传给它,以使这些类也可以写日志。) 节点级变量,当节点内的所有线程使用相同的变量时,变量可以放到节点里,变量在节 点内只有一份,节点级变量只能被设置一次。

class CTime { public: struct timeval m_tmVal; // m_tmVal. tv_sec 从 1970 年到当前时间的 秒数, m_tmVal. tv_usec 精确到微秒 struct tm m_tm; int32 m_iYYYYMMDD; //YYYYMMDD int32 m_iYYYYMM; //YYYYMM int32 m_iYYYY; //YYYY int64 m_llYYYYMMDDHH24MISS; //YYYYMMDDHH24MISS char m_szYYYY[8]; //YYYY char m_szYYYYMM[8]; //YYYYMM char m_szYYYYMMDD[9]; //YYYYMMDD char m_szYYYYMMDDHH24MISS[15]; //YYYYMMDDHH24MISS char m_szDateTimeMs // YYYY-MM-DD HH24:MI:SS.MS6 后面是 6 位数的微秒 static char *to_stringDateTime(char *szBuf, const time_t& lTime); static char *to_stringDate(char *szBuf, const time_t& lTime); static int64& to_stringDateTime(int64& llDateTime, const time_t& lTime);

static int32& to_stringDate(int32& iDate, const time_t& lTime); };

事物控制
事物控制总的指导思想是, 对有需要做事物控制的模块, 将其处理完成的结果及时保存 到 tmp 文件,事物提交时,将 tmp 文件改成正式文件。灾难发生时,对已经处理过的数据, 使用已经保存的 tmp 文件里的结果,不再重新处理。

查重
有两种方法,暂时使用第 1 种,对第 2 种的支持以后再加进去: 1、 查重现在本身有事物控制,不需要在查重外另做控制。灾难时,删除 tmp 文件,启 动进程,就可以了。 2、 查重提交一次索引文件作为一个事物, 提交完索引文件时, 发出一个事物完成消息, 由查重之外的模块完成事物控制(删除临时文件) 。一次事物可能是处理多个文件。

批价和出帐
一条话单作为一个事物, 事物完成不需发要事物完成消息。 批价和出帐后面会挂一个输 出模块,把话单及时保存到 tmp 文件。

其它模块 各模块的处理
在 billing40/framefile/frame_file_interface.h 里 , 有 两 个 类 , CProcessFileTherad 和 CProcessCdrThread,这两类都是从 CThread 派生。处理文件的模块都从 CProcessFileThread 派生,处理话单的模块都从 CProcessCdrThread 派生。

解码要做的事情
1、 用输入的 pFrameFile 参数调用 get_inputFileDesc()获得一个输入文件的句柄,使 用这个句柄读取输入文件的内容。 2、 用输入的 pFrameFile 参数调用 const char *get_inputFileName() const 获得输入文件 名。 3、 解码,每解一条话单,new 一个 CXdr,解码出来的 xdr,设置一些属性,并把它放 入文件。参考下面代码。

4、 最后,用 send_data 把话单送到下一模块。 #include <billing/framefile/frame_file_interface.h> class CDecodeThread : public bs_frame_file::CProcessFileThread { public: int32 init(); int32 destroy(); int32 run(bs_frame_file::CFrameFile * pFrameFile) { Int32 iFileDesc = pFrameFile ->get_inputFileDesc(); for() { Bs_frame_file::CXdr *pXdr = pFrameFile -> get_newXdr(); If(decode(xdr, buf) != 0) //解码失败 { pFrameFile ->delete_xdr(pXdr); continue; } //解码成功 if(normal) //正确话单 send_data(pXdr); else //错单 send_data(pXdr, 1); } pFrameFile -> end_decodeFile (); return 0; } private: }; bs_frame::CThread *create_instanceDecode() { Return new CDecodeThread (); }

查重要做的事情
1、 从输入的 pFrameFile, 通过 get_xdrBegin(), get_xdrEnd()和 get_xdrNext ) ( 获得每条话单。 #include <billing/framefile/frame_file_interface.h> class CDupThread : public bs_frame_file:: CProcessFileThread { public:

int32 init(); int32 destroy(); int32 run(bs_frame_file::CFrameFile * pFrameFile) { Bs_frame_file::CXdr *pTmpXdr=NULL; Bs_frame_file::CXdr *pEndXdr= pFrameFile ->get_xdrEnd(); For(Bs_frame_file::CXdr *pXdr= pFrameFile ->get_xdrBegin(); pXdr!= pEndXdr pXdr= pTmpXdr) { Check_dup(); pTmpXdr= pFrameFile ->get_xdrNext(pXdr); If(pTmpXdr == pEndXdr) { Commit index file; //提交查重索引文件 } if(normal) //正确话单 send_data(pXdr); else //错单 send_data(pXdr, 1); }

return 0; } private: }; bs_frame::CThread *create_instanceDup() { Return new CDupThread (); }

批价要做的事情(还有业务分析)
1、run 的输入是一个 xdr,直接开始批价。 2、处理时,如果需要拆话单,先从 xdr 里获得文件的句柄,再调用它的 add_record 函数。 如下面代码。 #include <frame_file_interface.h> class CRatingThread : public bs_frame_file::CProcessCdrThread { public: int32 init();

int32 destroy(); int32 run(Bs_frame_file::CXdr& xdr) { rating...(); if(add) //要拆话单 { bs_frame_file::CFrameFile *pFrameFile = *)xdr.get_attrInfo()->get_info(); Bs_frame_file::CXdr *pXdr = pFrameFile -> get_newXdr(&xdr); //process pXdr pFrameFile->add_record(pXdr); if(normal) //正确话单 send_data(pXdr); else //错单\ send_data(pXdr, 1); } if(normal) //正确话单 send_data(&xdr); else //错单 send_data(&xdr, 1); return 0; } private: }; bs_frame::CThread *create_instanceRating() { Return new CRatingThread (); }

(CFrameFile

入库要做的事情
入库的输入在上一版本的文档里是一个 xdr。但是入库需要按文件提交事物,输入的 xdr 也 不能按照话单在文件里的顺序输入,难以确定最后一条话单。改成以文件为单位输入。 1、 从输入的 pFrameFile, 通过 get_xdrBegin(),get_xdrEnd()和 get_xdrNext()获得每条话 单。 如下面代码。 #include <frame_file_interface.h> class CDataLoaderThread : public bs_frame_file:: CProcessFileThread { public: int32 init() {

M_iIndexLaterLink = CXdrList::get_fieldIndex (“LATER_LINK”); } int32 destroy(); int32 run(bs_frame_file::CFrameFile * pFrameFile) { For(Bs_frame_file::CXdr *pXdr= pFrameFile ->get_xdrBegin(); pXdr!= pFrameFile ->get_xdrEnd(); pXdr= pFrameFile ->get_xdrNext(pXdr)) { Insert_table(pXdr); If(newxdr) { Bs_frame_file::CXdr *pNewXdr = pFrameFile -> get_newXdr(*pXdr); Insert_table(pNewXdr); Send_data(pNewXdr); } If(last record) { Commit; //提交事物 } if(normal) //正确话单 send_data(pXdr); else //错单 send_data(pXdr, 1); } return 0; } private: }; bs_frame::CThread *create_instanceDataLoader() { Return new CDataLoaderThread (); }

数据管理中心要做的事情
在 int32 run(bs_frame::CData *p)里完成日常维护的事情。函数里要有一个 sleep,不然会导致 线程 CPU 占用过高。p 指向 NULL,对数据管理中心没用。 在 int32 control(DOMDocument& resultDoc, const DOMDocument& controlDoc)里处理前台发 过来的控制消息。 #include <billing/framework/frame_interface.h >

class Codmc : public bs_frame::CThread { public: int32 init(const char *cszCfg); int32 destroy(); int32 run(bs_frame::CData *p) { //日常维护 Sleep(1); return 0; } int32 control(DOMDocument& resultDoc, const DOMDocument& controlDoc); private: }; bs_frame::CThread *create_instanceOdmc() { Return new Codmc (); }

每个业务处理模块要做的事情
int32 init(); 程序初始化时调用。如读配置项,连接 MDB,获取 xdr 的 index,装载局数据等 int32 destroy(); 程序退出前在这个函数做一些收尾工作。如,从 MDB 断开。 int32 control(DOMNode & resultDoc, const DOMNode & controlDoc); 前台发过来控制消息。DOMDocument 是 xml 的文档类。controlDoc 是控制消息,里面 有控制命令,控制参数,控制时要使用的一些数据。resultDoc 是控制结果。 Bs_frame_file::CXdr, 从 CXdr 派生,在 CXdr 的基础上增加了框架管理使用的一些属性,业 务处理模块不需要关心。像使用 CXdr 一样使用 bs_frame_file::CXdr 就可以了。数据管理中 心使用 CXdr 不需要使用 bs_frame_file::CXdr。 文件的处理使用了文件模块,头文件为“#include <frame_file_interface.h>” ,动态库名字为 libFrameFile$(DLLTAIL),编译时用参数-lFrameFile$(BUILDTYPE)。数据管理中心不使用这 个 lib。

注意事项
1. 2. 因为一个节点可能会处理多种话单, 但使用 xdr 的时候是字段的名字只对应一个 index。 需要 xdr 支持这种对应关系。 有一些函数不是多线程安全的,不能在多线程里使用,如:

a) 3. 4. 5. 6.

7. 8.

9.

Ctime,localtime,gmtime,asctime,要改成使用 ctime_r,localtime_r,gmtime_r, asctime_r init 函数最好写成可以被多次调用的方式,如改了配置文件,前台通知节点更新配置项 的值,但不需要停止进程。 写日志方式,使用起来也不方便,请大家提提建议。 frame_interface.h 里定义了一个枚举类型 STAT,这个名字和系统下一个宏同名(在 /usr/include/sys/dir.h) ,改成 THREAD_STAT。 编译时可能会碰到类似下面的错误,是因为 ailog.h 里定义了宏 DEBUG_LEVEL 和 frame_interface.h 里冲突之故,新框架下不再使用 ailog,将 ailog.h 的包含去掉后,就可 以编译通过了。 /data03/home/bill401/include/billing40/frame_interface.h", line 53.9: 1540-0063 (S) The text "0" is unexpected. 如果程序运行出现莫明其妙的错误,可能是 bill402 上的程序和 lib 不够新,到 bill401 上把所有的程序和 lib 拷过来,覆盖 bill402 上的。再试。 新的文件格式里有文件头信息, 如果拿老框架跑出来的文件处理的话, 要对这些文件做 一些处理(加上文件头信息) ,新框架下提供了一个程序,change_file,如 change_file –f DASE1070418.TTFILE00.8887.roam –t dr_gsm,把老话单文件转成新话单文件。解码的 输入文件不需要做转化。 LogAppend 函数,框架提供两个 LogAppend 函数,一个是类成员 class CThread{ void LogAppend(const LOG_LEVEL& iLevel, const char *cszFormat, ...) const; },还有一个是 全局函数 void LogAppend(const char *cszModule, const LOG_LEVEL& eLogLevel, const char *cszFormat, ...); ,使用成员函数打日志时,框架会标识是哪个线程打的日志,用 全局函数打日志时,框架只用用函数的参数(cszModule)标识是谁打的日志。建议使 用成员的函数打日志, 由框架增加日志来源的标识, 在一个进程里只有一个实例的模块, 可以用全局函数打日志,用(cszModule)框架日志来源。新的 LogAppend 函数的用法和 ailog 里的 LogAppend 的用法有改变, ailog 的做法是, 先声明一个 BUF, 再格式化数据, 然后调用 LogAppend,新的做法是,格式化数据放在 LogAppend 里做,去掉了 ailog 里 使用的 messageFile.txt,增加了名字空间 NAMESPACE_BILLING40_FRAME,使用全 局的 LogAppend 和日志级别时,都要加有名字空间。 Ailog 的 LogAppend 一般是这样打日志的: Char szBuf[1024]; Sprintf(szBuf, "E302:file:%s,has zero record", taskStat.get_fileName().data()); LogAppend(WARN_LEVEL, “DebugInfo”, szBuf); 新框架下是这样打日志的: 成员方式: This->LogAppend(NAMESPACE_BILLING40_FRAME WARN_LEVEL, "E302:file:%s,has zero record", taskStat.get_fileName().data()); 全局函数方式: NAMESPACE_BILLING40_FRAME LogAppend(“rating”, NAMESPACE_BILLING40_FRAME WARN_LEVEL, "E302:file:%s,has zero record", taskStat.get_fileName().data());

10.

例子 时钟
文件清单 Time.h #ifndef _BILLING_TIME_H_ #define _BILLING_TIME_H_ #include "frame_interface.h" class CTimeMgr : public CThread { public: int32 init(const char *cszCfg); int32 destroy(); int32 run(CData *p); CTimeMgr(); virtual ~CTimeMgr(); private: enum { TIME_STEP = 10, ARR_SIZE = 3600*TIME_STEP }; CTime m_arrTime[ARR_SIZE]; int32 m_iIndex; }; #ifdef __cplusplus extern "C" { #endif CThread *create_clockInstance(); #ifdef __cplusplus } #endif #endif //_BILLING_TIME_H_ Time.cpp #include "time.h" CTimeMgr::CTimeMgr()

: m_iIndex(0) { } CTimeMgr::~CTimeMgr() { } int32 CTimeMgr::init(const char *cszCfg) { m_iIndex = 0; run(NULL); return 0; } int32 CTimeMgr::destroy() { return 0; } int32 CTimeMgr::run(CData *p) { CTime *pTime = &m_arrTime[m_iIndex]; time(&pTime->m_lTime); memset(&pTime->m_tm, 0, sizeof(&pTime->m_tm)); localtime_r(&pTime->m_lTime, &pTime->m_tm); sprintf(pTime->m_szYYYYMMDDH24MISS, "%.4d%.2d%.2d%.2d%.2d%.2d", pTime->m_tm.tm_year + 1900, pTime->m_tm.tm_mon + 1, pTime->m_tm.tm_mday, pTime->m_tm.tm_hour, pTime->m_tm.tm_min, pTime->m_tm.tm_sec); memset(pTime->m_szYYYYMMDD, 0, sizeof(pTime->m_szYYYYMMDD)); strncpy(pTime->m_szYYYYMMDD, pTime->m_szYYYYMMDDH24MISS, 8); g_pTime = pTime; if(++m_iIndex >= ARR_SIZE) m_iIndex = 0; fd_set readSet; FD_ZERO(&readSet); FD_SET(0, &readSet); struct timeval tm; tm.tv_sec = 0;

tm.tv_usec = 1000/TIME_STEP; select(1, &readSet, NULL, NULL, &tm); return 0; } #ifdef __cplusplus extern "C" { #endif CThread *create_clockInstance() { return new CTimeMgr(); } #ifdef __cplusplus } #endif

Xdr 写文件
xdr_write_file.h #ifndef _XDR_WRITE_FILE_H_ #define _XDR_WRITE_FILE_H_ #include "bill_file.h" #include "../record.h" class CXdrWriteFile : public CProcessCdrThread { public: enum{ RECORD_BUF_LEN = 1024 *1024 }; public: virtual int32 init(const char *cszCfg); virtual int32 destroy(); CXdrWriteFile(); virtual ~CXdrWriteFile(); protected: virtual int32 run(CXdr& xdr); virtual int32 exception(CXdr& xdr); private: };

#ifdef __cplusplus extern "C" { #endif CThread *create_writeFileInstance(); #ifdef __cplusplus } #endif #endif //_XDR_WRITE_FILE_H_

文件清单 xdr_write_file.cpp #include "xdr_write_file.h" #include <errno.h> #include <sys/time.h> #include <unistd.h> CXdrWriteFile::CXdrWriteFile() { } CXdrWriteFile::~CXdrWriteFile() { } int32 CXdrWriteFile::init(const char *cszCfg) { int32 iRet = 0; return CProcessCdrThread::init(cszCfg); } int32 CXdrWriteFile::destroy() { int32 iRet = 0; return iRet; } int32 CXdrWriteFile::run(CXdr& xdr) { xdr.m_bFinished = true; // AISTD cout << "---------write get date:" << pFrameFile << AISTD endl << AISTD flush; CFrameFile *pFrameFile = (CFrameFile *)xdr.get_attrInfo()->get_info(); if(pFrameFile==NULL)

{ LogAppend(FATAL_LEVEL, "the bill file in xdr is null"); return 0; } int32 iRet = pFrameFile->write_record(xdr, m_cszNodeName); if(iRet == 1) //file finished { send_data(pFrameFile); } return 0; } int32 CXdrWriteFile::exception(CXdr& xdr) { xdr.m_bFinished = true; return 0; } #ifdef __cplusplus extern "C" { #endif CThread *create_writeFileInstance() { return new CXdrWriteFile(); } #ifdef __cplusplus } #endif

业务分析配置举例
配置文件
采用 xml 格式文件后,是这样的。

<customer_server_host>10.10.10.172</customer_server_host> <customer_server_port>25500</customer_server_port> <expr_file> <LOAD_EXPR_FILE>

<file>rule/analyse_gsm_zj1.expr</file> </LOAD_EXPR_FILE> <LOAD_EXPR_FILE> <file>rule/analyse_gsm_zj2.expr</file> </LOAD_EXPR_FILE> </expr_file> <reg_oper_lib_set> <reg_oper_lib> <LOAD_OPER_LIB>$(OB_REL)/lib/libExprAnalyseD1.so</LOAD_OPER_LIB> <LOAD_OPER_FUNC>RegAnalyseCommExprFun</LOAD_OPER_FUNC> </reg_oper_lib> <reg_oper_lib> <LOAD_OPER_LIB>=$(OB_REL)/lib/libExprAnalyseD2.so</LOAD_OPER_LIB> <LOAD_OPER_FUNC>RegAnalyseCommExprFun2</LOAD_OPER_FUNC> </reg_oper_lib> </reg_oper_lib_set> <reg_global_var> <LOAD_GLOBAL_VAR> <value>sTHIS_PROV=571</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iDEFAULT_PRODUCT_ID=50001001</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iCALLER_OTHER_CHECK=1</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iCALLER_OTHER_ZERO_BEFOR=1</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iCALLEE_OTHER_CHECK=0</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iSEEK_USER_INFO=1</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iCHECK_NO_OWNER=0</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iCHECK_USER_INFO_SUB_TIME=1</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR>

<value>iOPP_INFO_FLAG=0</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>sRATE_PROD_JPAN_KOREA=90134001</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iPLAN_ID_JPAN_KOREA=9000</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>sRATE_PROD_PP_USER=50001018</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iPLAN_ID_PP_USER=600</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iTOLL_SHORT_DURATION=3</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>s2i_RIN_SHORTEST_EXCEPT_HEAD=17950;17910;17930;17908;17920;17900;1 7990</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iBOSS_VPMN_GROUPTYPE=5</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iVPMN_CAST_FLAG=1</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iDO_FILTER_ERR=0</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>iCHECK_USER_INFO_SERVICE_ID=50001</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>sEXPIRE_TIME_1860=20061218000000</value> </LOAD_GLOBAL_VAR> <LOAD_GLOBAL_VAR> <value>s2s_SIMN_I_FILE_PLMNPROMLIST=HKGPP:50103500;</value> </LOAD_GLOBAL_VAR> </reg_global_var> 程序里写成这样

class CConfig { public: class CExprFileConfig { public: char m_szExprFileName[512]; READ_CONFIG_BEGIN READ_VALUE(m_szExprFileName, "file", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_CONFIG_END }; typedef NAMESPACE_BILLING40_FRAME CConfigList< CExprFileConfig ExprFileList; class CLibFuncConfig { public: char m_szLibName[512]; char m_szFuncName[128]; READ_CONFIG_BEGIN READ_VALUE(m_szLibName, "LOAD_OPER_LIB", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_VALUE(m_szFuncName, "LOAD_OPER_FUNC", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_CONFIG_END }; typedef NAMESPACE_BILLING40_FRAME CConfigList< CLibFuncConfig LibFuncList; class CGlobalVarConfig { public: char m_szValue[1024]; READ_CONFIG_BEGIN READ_VALUE(m_szValue, "value", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_CONFIG_END }; typedef NAMESPACE_BILLING40_FRAME CConfigList< CGlobalVarConfig GlobalVarList; char m_szCustomerServerHost[16]; uint16 m_usCustomerServerPort; ExprFileList m_lstExprFile; LibFuncList m_lstLibFunc;

"",

>

"", "",

>

"",

>

GlobalVarList m_listGlobalVar; READ_CONFIG_BEGIN READ_VALUE(m_szCustomerServerHost, "customer_server_host", "", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_VALUE(m_usCustomerServerPort, "customer_server_port", "", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_OBJ(m_lstExprFile, "expr_file", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_OBJ(m_lstLibFunc, "reg_oper_lib_set", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_OBJ(m_listGlobalVar, "reg_global_var", NAMESPACE_BILLING40_FRAME FATAL_LEVEL) READ_CONFIG_END };

读取配置文项 class CAnalyseThread : public bs_frame_file::CProcessCdrThread { public: int32 init() { Int32 iRet = read_config(m_objConfig); If(iRet == 0) { Return 0; } Else { Return -1; } } int32 destroy(); int32 run(Bs_frame_file::CXdr& xdr) private: CConfig m_objConfig; };

配置文件举例
红色部分需要按照模块情况修改成具体的配置。

解码的配置文件
<?xml version="1.0" encoding="utf-8" ?> <root> <shl_func_define> <function> <func_id>1</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_inputFileInstence</func_name> </function> <function> <func_id>2</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_outputRecordInstence</func_name> </function> <function> <func_id>3</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_outputFileInstence</func_name> </function> <function> <func_id>4</func_id> <shl_lib_name>decode lib name</shl_lib_name> <func_name>decode func name</func_name> </function> </shl_func_define> <node_define> <node> <node_id>1</node_id> <node_name>input</node_name> <functions> <function> <func_id>1</func_id> <batch_id>1</batch_id> </function> </functions> </node> <node>

<node_id>2</node_id> <node_name>output record</node_name> <functions> <function> <func_id>2</func_id> <batch_id>1</batch_id> </function> </functions> </node> <node> <node_id>3</node_id> <node_name>output file</node_name> <functions> <function> <func_id>3</func_id> <batch_id>1</batch_id> </function> </functions> </node> <node> <node_id>4</node_id> <node_name>decode</node_name> <functions> <function> <func_id>4</func_id> <batch_id>1</batch_id> </function> </functions> </node> </node_define> <common_config> <proc_id>1111</proc_id> <log> <path_file>$(HOME)/log/log_prefix</path_file> <level>warn_level</level> </log> <server> <ip>0.0.0.0</ip> <port>55555</port> </server> </common_config> <flows>

<flow> <flow_name>decode</flow_name> <flow_configs> <!-- config of flow --> </flow_configs> <nodes> <node> <node_id>1</node_id> <node_name>open decode file</node_name> <define_id>1</define_id> <next_nodes> <node> <node_id>2</node_id> <node_index>0</node_index> </node> </next_nodes> <node_configs> <sysinfo>config/sysinfo</sysinfo> <path_infos> <path_info> <input_path>data/asc/in</input_path> <!-- 1:input type block, 2: input type record --> <input_type>1</input_type> <!-- the data send to next node, 1: record, 2: file--> <result_type>2</result_type> <bak_path>data/asc/bak</bak_path> <out_paths> <out_path> <out_id>0</out_id> <!--the order of this out_path in all out_path--> <out_order>0</out_order> <!--for the tmp record file node, the index of the next node in all next nodes, -1 if there is no next node--> <node_index>-1</node_index> <!--write way 1:normal; 2:tmp; 3:none--> <write_way>1</write_way> <!-- 1: remove tmp file, 2:process tmp file continue --> <process_tmp_file>1</process_tmp_file> <!--the data type send to next node 1:xdr, 2:file--> <out_data_type>-1</out_data_type>

<normal_path>data/asc/out1</normal_path> <error_path>data/asc/error1</error_path> <stat_path>data/asc/stat1</stat_path> </out_path> </out_paths> </path_info> </path_infos> </node_configs> <func_configs> </func_configs> </node> <node> <node_id>2</node_id> <node_name>decode</node_name> <define_id>4</define_id> <next_nodes> <node> <node_id>3</node_id> <node_index>0</node_index> </node> <node> <node_id>3</node_id> <node_index>1</node_index> </node> </next_nodes> <node_configs> <!-- config of decode --> </node_configs> </node> <node> <node_id>3</node_id> <node_name>write final record</node_name> <define_id>3</define_id> <node_configs> <out_id>0</out_id> </node_configs> </node> </nodes> </flow> </flows> </root>

业务分析,批价,帐务处理的配置文件
<?xml version="1.0" encoding="utf-8" ?> <root> <shl_func_define> <function> <func_id>1</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_inputFileInstence</func_name> </function> <function> <func_id>2</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_outputRecordInstence</func_name> </function> <function> <func_id>3</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_outputFileInstence</func_name> </function> <function> <func_id>4</func_id> <shl_lib_name>lib of rpl</shl_lib_name> <func_name>func of rpl</func_name> </function> </shl_func_define> <node_define> <node> <node_id>1</node_id> <node_name>input</node_name> <functions> <function> <func_id>1</func_id> <batch_id>1</batch_id> </function> </functions> </node> <node> <node_id>2</node_id> <node_name>output record</node_name> <functions> <function> <func_id>2</func_id>

<batch_id>1</batch_id> </function> </functions> </node> <node> <node_id>3</node_id> <node_name>output file</node_name> <functions> <function> <func_id>3</func_id> <batch_id>1</batch_id> </function> </functions> </node> <node> <node_id>4</node_id> <node_name>rpl</node_name> <functions> <function> <func_id>4</func_id> <batch_id>1</batch_id> </function> </functions> </node> </node_define> <common_config> <proc_id>1111</proc_id> <log> <path_file>$(HOME)/log/log_prefix</path_file> <level>warn_level</level> </log> <server> <ip>0.0.0.0</ip> <port>55555</port> </server> </common_config> <flows> <flow> <flow_name>rpl</flow_name> <flow_id>1</flow_id> <flow_configs> <!-- config of flow -->

</flow_configs> <nodes> <node> <node_id>1</node_id> <node_name>create record</node_name> <define_id>1</define_id> <next_nodes> <node> <node_id>2</node_id> <node_index>0</node_index> </node> <node> <node_id>3</node_id> <node_index>1</node_index> </node> </next_nodes> <node_configs> <sysinfo>config/sysinfo</sysinfo> <path_infos> <path_info> <input_path>data/asc/in</input_path> <!-- 1:input type block, 2: input type record --> <input_type>2</input_type> <!-- the data send to next node, 1: record, 2: file--> <result_type>1</result_type> <bak_path>data/asc/bak</bak_path> <out_paths> <out_path> <out_id>0</out_id> <!--the order of this out_path in all out_path--> <out_order>0</out_order> <!--for the tmp record file node, the index of the next node in all next nodes, -1 if there is no next node--> <node_index>-1</node_index> <!--write way 1:normal; 2:tmp; 3:none--> <write_way>1</write_way> <!-- 1: remove tmp file, 2:process tmp file continue --> <process_tmp_file>2</process_tmp_file> <!--the data type send to next node 1:xdr, 2:file--> <out_data_type>-1</out_data_type>

<normal_path>data/asc/out1</normal_path> <error_path>data/asc/error1</error_path> <stat_path>data/asc/stat1</stat_path> </out_path> </out_paths> </path_info> </path_infos> </node_configs> <func_configs> <batch_1> <fatal_level>3</fatal_level> </batch_1> </func_configs> </node> <node> <node_id>2</node_id> <node_name>rpl</node_name> <define_id>4</define_id> <next_nodes> <node> <node_id>3</node_id> <node_index>0</node_index> </node> <node> <node_id>3</node_id> <node_index>1</node_index> </node> </next_nodes> <node_configs> <!--rpl configs --> </node_configs> </node> <node> <node_id>3</node_id> <node_name>write final record</node_name> <define_id>2</define_id> <node_configs> <out_id>0</out_id> </node_configs> </node> </nodes> </flow> </flows> </root>

查重,入库的配置文件
<?xml version="1.0" encoding="utf-8" ?> <root> <shl_func_define> <function> <func_id>1</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_inputFileInstence</func_name> </function> <function> <func_id>2</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_outputRecordInstence</func_name> </function> <function> <func_id>3</func_id> <shl_lib_name>libFrameFileD.so</shl_lib_name> <func_name>create_outputFileInstence</func_name> </function> <function> <func_id>4</func_id> <shl_lib_name>lib of dataload</shl_lib_name> <func_name>func of dataload</func_name> </function> </shl_func_define> <node_define> <node> <node_id>1</node_id> <node_name>input</node_name> <functions> <function> <func_id>1</func_id> <batch_id>1</batch_id> </function> </functions> </node> <node> <node_id>2</node_id> <node_name>output record</node_name> <functions> <function> <func_id>2</func_id>

<batch_id>1</batch_id> </function> </functions> </node> <node> <node_id>3</node_id> <node_name>output file</node_name> <functions> <function> <func_id>3</func_id> <batch_id>1</batch_id> </function> </functions> </node> <node> <node_id>4</node_id> <node_name>dataload</node_name> <functions> <function> <func_id>4</func_id> <batch_id>1</batch_id> </function> </functions> </node> </node_define> <common_config> <proc_id>1111</proc_id> <log> <path_file>$(HOME)/log/log_prefix</path_file> <level>warn_level</level> </log> <server> <ip>0.0.0.0</ip> <port>55555</port> </server> </common_config> <flows> <flow> <flow_name>dataload</flow_name> <flow_id>1</flow_id> <flow_configs> <!-- config of flow -->

</flow_configs> <nodes> <node> <node_id>1</node_id> <node_name>create record</node_name> <define_id>1</define_id> <next_nodes> <node> <node_id>2</node_id> <node_index>0</node_index> </node> </next_nodes> <node_configs> <sysinfo>config/sysinfo</sysinfo> <path_infos> <path_info> <input_path>data/asc/in</input_path> <!-- 1:input type block, 2: input type record --> <input_type>2</input_type> <!-- the data send to next node, 1: record, 2: file--> <result_type>2</result_type> <bak_path>data/asc/bak</bak_path> <out_paths> <out_path> <out_id>0</out_id> <!--the order of this out_path in all out_path--> <out_order>0</out_order> <!--for the tmp record file node, the index of the next node in all next nodes, -1 if there is no next node--> <node_index>-1</node_index> <!--write way 1:normal; 2:tmp; 3:none--> <write_way>1</write_way> <!-- 1: remove tmp file, 2:process tmp file continue --> <process_tmp_file>1</process_tmp_file> <!--the data type send to next node 1:xdr, 2:file--> <out_data_type>-1</out_data_type> <normal_path>data/asc/out1</normal_path> <error_path>data/asc/error1</error_path> <stat_path>data/asc/stat1</stat_path> </out_path>

</out_paths> </path_info> </path_infos> </node_configs> <func_configs> <batch_1> <fatal_level>3</fatal_level> </batch_1> </func_configs> </node> <node> <node_id>2</node_id> <node_name>dataload</node_name> <define_id>4</define_id> <next_nodes> <node> <node_id>3</node_id> <node_index>0</node_index> </node> <node> <node_id>3</node_id> <node_index>1</node_index> </node> </next_nodes> <node_configs> <!--dataload configs --> </node_configs> </node> <node> <node_id>3</node_id> <node_name>write final record</node_name> <define_id>3</define_id> <node_configs> <out_id>0</out_id> </node_configs> </node> </nodes> </flow> </flows> </root>


相关文章:
多线程框架下开发
多线程框架下开发_信息与通信_工程科技_专业资料 暂无评价|0人阅读|0次下载|举报文档 多线程框架下开发_信息与通信_工程科技_专业资料。框架下开发基于新框架的...
多线程开发的捷径:构建Java并发模型框架
多线程开发的捷径:构建Java并发模型框架_IT/计算机_专业资料 暂无评价|0人阅读|0次下载|举报文档 多线程开发的捷径:构建Java并发模型框架_IT/计算机_专业资料。多...
架构设计多线程
架构设计多线程_计算机软件及应用_IT/计算机_专业资料...在实际的软件开发过程中,经常会碰到如下场景:某个...搞出很多 SOCKET 通讯库和框架给大伙儿用(比如 C++...
Java多线程框架Executor详解
暂无评价|0人阅读|0次下载|举报文档Java多线程框架Executor详解_计算机软件及应用_IT/计算机_专业资料。java,多线程,Executor框架,自定义,线程池 Java...
JAVA多线程框架
JAVA多线程框架_计算机软件及应用_IT/计算机_专业资料。多线程java.util.concurrent 多线程框架 (来源于 http://www.zhuaxia.com/item/590227619/) JDK5 中的一...
Foundation框架和多线程小结
Foundation框架多线程小结_互联网_IT/计算机_专业资料。Foundation 框架多线程...foundation框架的学习介... 64页 1下载券 Mac Os Xcode开发人员入... 66页...
Java多线程,集合框架(附简单代码)
暂无评价|0人阅读|0次下载|举报文档 Java多线程,集合框架(附简单代码)_计算机软件及应用_IT/计算机_专业资料。■多进程与多线程 多进程:在操作系统中能(同时)运...
大型WEB网站架构深入分析_线程和多线程
暂无评价|0人阅读|0次下载|举报文档大型WEB网站架构深入分析_线程和多线程_计算机...但是开发者在创建线程时必须小心考虑系统的承受能力,在多对多模式中,以上 缺点...
【开源框架】一个基于回调机制的多线程异步Http连接框架
暂无评价|0人阅读|0次下载|举报文档 【开源框架】一个基于回调机制的多线程异步Http连接框架_计算机软件及应用_IT/计算机_专业资料。www.tsingyuan.cn 【开源框架...
Windows环境下的多线程编程
所以线程可以看作是在进程框架下独立运行的与进程功能相关的 程序。如果将windows...Windows下c++开发环境搭... 16页 免费 Windows环境下的多线程编... 暂无...
更多相关标签:
安卓多线程下载框架 | ios开发多线程下载 | java多线程框架 | 多线程框架 | 力软敏捷开发框架下载 | java多线程框架有哪些 | c 多线程框架 | winform开发框架 下载 |