一些代码逻辑
[TOC]
# CMA 数据采集系统
1. 简介
CMA 针对服务监控,在服务代码中通过 sdk 打监控,最后通过可以通过监控页面查询,同时可以配置相应报警。
TKNOSService 中常用的是 DAT 和 CMT 两个集群。
1.1 实现类
监控指标采集API(StatAPI)为TK后台服务提供:易用的静(动)态指标采集接口与标准化的动态指标采集接口,分别由三个类实现:1
(1)bgs::CServiceStat:静态指标采集(无锁)
静态指标采集 API 由类 bgs::CServiceStat 支持,适用于指标名确定、指标数变动小的采集。
#####(2)bgs::CServiceStatDynamic:动态指标采集(有锁)
动态指标采集 API 由类 bgs::CServiceStatDynamic 支持,适用于指标名不定、指标数变动大的采集,例如不同事件 ID 的访问次数、不同奖品的兑换次数等。该类可以动态监控指标累积,指标名可以动态生成,不需要预先定义。
(3)bgs::CServiceStatNormalize:动态指标采集 _Tag 扩展(有锁)
标准化的动态指标采集 API 由类 bgs::CServiceStatNormalize 支持,适用于多维度指标统计、指标名不定、指标数变动大的采集,例如不同事件ID的访问次数、不同奖品的兑换次数等。该类可以动态监控指标累积,指标名可以动态生成,不需要预先定义。
1.2 提供的功能1
- 原子化的监控指标累积
- 监控指标存储空间管理
- 监控指标的格式化输出
- 监控指标向监控体系上报
- 服务状态信息(启动时间、编译时间、启动时长等)向监控体系上报
1.3 使用方式1
- 指标累积并输出到服务日志
- 指标累积并上报给监控体系
- 指标累积、输出到服务日志并上报给监控体系
2. 使用步骤
三种类的三种使用方式略有不同,但是整体思路基本都是一致的。
下面以 NOSService 为例,总结静态采集 bgs::CServiceStat 类的使用步骤:
2.0 全局 INI 配置
在全局 INI(TKService.ini)中配置 TKCMAAgentService 的连接方式:(若仅需 “指标累积并输出” 无需此步骤)
1
2
[Global]
TKCMAAgentService_SPIF=127.0.0.1:30900:2
2.1 定义成员变量
在服务类中定义 CServiceStat 成员。
1
2
3
4
5
6
7
#include <ServiceStat.h>
class CTKNOSService : public CTKWinObjNetService
{
public: //param
bgs::CServiceStat m_serverStat_; // 用于静态统计监控数据
};
2.2 初始化
(1)CServiceStat 初始化函数声明:
1
2
3
4
5
bool CServiceStat::Init(const string& statdefine,
const string& statname,
const string& section = "",
const string& entry = "",
const string& configfile = "");
参数解释:
-
statdefine:定义统计项输出的字符串。
统计项与项之间以逗号分隔,字符串中只允许出现
[A-Za-z,-./0-9:_]这些字符统计项字符串。基本格式:
count1,count2,tick3/count3:avg_tick3:ms,count4,count5其中复合项模板:
Dividend/Divisor:ItemName:Unit,表示:ItemName=Dividend/Divisor(Unit)样例:
基本使用:
count1,count2,count3,count4增加平均统计:
count1,tick2:count2,count3,tick4/count4增加平均延迟/大小统计:
count1,count3,tick4/count4:avg_tick:ms,size4/count4:avg_size:B -
statname : 监控名,16位以下的字符串,标识监控数据的类型,要求在整个监控体系内唯一
-
section:配置段,默认为 Global
-
entry:配置项,与配置段一起,定义 TKCMAAgentService 连接串的配置位置,默认为TKCMAAgentService_SPIF
为了向前兼容,在找不到 TKCMAAgentService_SPIF 的情况下,会继续寻找 TKLogAgentService_SPIF
-
configfile:配置文件路径,默认为全局
INI,.\\Service.ini
需要注意的是:
-
一定要在 TKService::OnInitUpdate 里面调用,以防止多线程对初始化的竞争
-
第二个参数 statname 的长度不能超过15位(含)
-
如果在配置文件中没有发现状态汇报服务器相关配置,状态信息将强制输出到日志文件中
(2)TKNOSService 中执行初始化:
在 CTKHisService::OnInitialUpdate() 中通过监控定义串初始化 CServiceStat 类:
1
2
3
4
5
6
7
8
9
//初始化监控
if (!this->m_serverStat_.Init("SEHExcept,UpdTick/UpdC,UpdZsetTick/UpdZsetC,MUpdTick/MUpdC,"
"QueTick/QueC,QueBitTick/QueBitC,MQueTick/MQueC,UdpHDELTick/UdpHDELC,"
"AgentUpdTick/AgentUpdC,AgentQueTick/AgentQueC,AgentMQueTick/AgentMQueC,"
"AgentUpdByTagTick/AgentUpdByTagC,AgentQueryTagExTick/AgentQueryTagExC,"
"AgentQueryEvtListTick/AgentQueryEvtListC,AgentGetTriggerTaskTick/AgentGetTriggerTaskC,ModifyByTagOutMem,ServiceOutofMem", "NOSServerStat"))
{
TKWriteLog("<ERR>服务静态监控指标NOSServerStat初始化失败");
}
2.3 指标累积
在监控指标需要累积的地方,使用对应的函数:1
1
2
3
4
5
6
// 下面的设置会得到结果ts:1449477351,login:1,reg:3,tick_avg:2ms,undefined:0
this->hisstat_.Increase("login");
this->hisstat_.Increase("reg");
this->hisstat_.Increase("tick",6);
this->hisstat_.Set("count",3);
this->hisstat_.Max("reg",3);
2.4 OnTimeout()
在CTKHisService::OnTimeout()中需要调用对应的处理函数,对于不同的使用场景,此处添加的函数不同:
(1)“指标累积并输出到服务日志”
1
2
3
string strforPrint;
this->m_serverStat_.GetAndResetAllStat(strforPrint);
TKWriteLog("<STAT> %s", strforPrint.c_str());
(2)“指标累积并上报”
1
this->m_serverStat_.ReportStat();
(3)“指标累积、输出并上报”
1
this->m_serverStat_.ReportStat(true);
3. 监控指标数据流 1
- [被监控TK服务] –调用–> [StatAPI] –TK协议–> [TKCMAAgentService] –TK协议–> [TKLogCollectorService] –保存–> [文件]
- [文件] –读取–> [上传程序] –HTTP协议–> [openfalcon(小米监控)] + [KariosDB] –访问–> [MIS展示]
- [文件] –分析–> [分析程序] –推送–> [报警]
# 账单
账单针对数据明细,当积分配置属性且修改时,发送 socket 消息到账单服务,最后流转数据可以在【mis】-【配置平台】-【EVT自助运维平台】-【生态管理】-【账单查询】中可以查到。
基本思路:
代码基本使用流程
1)对象初始化:
- 初始化 CTR 连接池
m_connpoolCTR,该对象会在CTKNOSService::OnAsyncSendBill(...)中被使用,用于大批量异步发送账单信息。
1
2
3
4
5
6
//初始化连接池
if (!this->m_connpoolCTR.OnInitialUpdate("TKNOSService", "TKCTRCollectorService_SPIF", 25, TK_CONNECT_IDLEMODE_ONLYKEEP, ".\\TKService.ini"))
{
TKWriteLog("%s:%d 初始化到TKCTRCollector的连接池失败,请检测全局INI中[TKNOSService]TKCTRCollectorService_SPIF配置", __FILE__, __LINE__);
return FALSE;
}
- 初始化 直连 MySQL 连接池
m_mysqlPool,该对象会在CTKNOSService::OnAsyncSendBill(...)中被使用,用于向该连接池转发写账单的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
// 更新useMysql项
m_useMysql = TKGetIniInt("Config", "UseMysql", 0);
if (UseMysql()) {
// 初始化直连mysql连接池
m_mysqlPool = mysqlUtil::CMySqlConnPool::GetInstance();
//...
// 从 INI 配置中获取 MySQLConfig 配置项中的字段
//...
// 初始化 MySQL 配置:
m_mysqlPool->init(user, passwd, ip_port, db);
//...
}
2)初始化 CTKNOSService 的异步对象并绑定函数
- 初始化一个
CTKAsyncSendObject<CTKNOSService>的对象m_clsAsyncSendBill,并绑定函数CTKNOSService::OnAsyncSendBill(...)用于异步发消息:
1
2
3
4
5
6
7
8
// 取参数-异步连接池个数:
int ansySendCnt = TKGetIniInt("TKNOSService", "TKCTRCollectorService_STIF", 20, ".\\TKService.ini");
// 初始化异步对象-用于发送账单的对象:
if (!this->m_clsAsyncSendBill.Init(this, &CTKNOSService::OnAsyncSendBill, ansySendCnt))
{
TKWriteLog("<ERR> 初始化账单异步消息发送对象失败!");
return FALSE;
}
3)异步发送账单函数
- 补充好异步发送账单的函数
CTKNOSService::OnAsyncSendBill(...):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
BOOL CTKNOSService::OnAsyncSendBill(PTKHEADER pMsg, DWORD dwParam, BOOL bWaitAck)
{
if (UseMysql()) {
// 直连mysql的方式记录账单
mysqlpp::Connection * conn = nullptr;
// 从 m_mysqlPool 中获取一个MySQL的连接:
conn = m_mysqlPool->GetConnection();
//...
// 构造插入语句
std::ostringstream oss;
//...
oss << "INSERT INTO " << m_mysqlPool->GetDB() << ".nos_bill_data_" << str_now << " (dataid,busdata_id,pid,mpid,ts,value,nv,ov,modifytype,note) VALUES (" << value["dataid"] << "," << value["BillType"] << "," << value["PID"] << "," << value["mpid"] << "," << bill_ts << "," << value["Value"] << "," << value["nv"] << "," << value["ov"] << "," << value["ModifyType"] << ",'" << string(value["note"].asCString()) << "');";
//...
// 执行查询
mysqlpp::Query query = conn->query(oss.str().c_str());
//...
// 监控返回时间,记录CMA
//...
// 释放 conn 连接
//...
} else {
// 从 CTR 连接池获取连接:
CTKConnection * pconn(NULL);
pconn = m_connpoolCTR.GetConnection(__FILE__, __LINE__);
//...
// 向 CTR 转发收到的消息:
if (!pconn->SendMsg(pMsg, TRUE, pMsg->dwType | TK_ACK)) {...}
// 获取返回的消息:
PTKHEADER pRecvMsg = pconn->PeerRecvMsg();
//...
// 释放 pconn 连接
//...
}
if (!IsOK(ec)) {
//消息异步恢复
m_sdNOSSendRecover.RecordMsg(NOS_MSGRECOVER_BILLDATA, (char *)pMsg, pMsg->dwLength + sizeof(TKHEADER));
}
return TRUE;
}
疑问:CTR是干啥的?
单从使用内容来看,CTR应该包含如下内容和功能:
- 解析传入的命令;
- 攒出来一个 MySQL 语句;
- 连接账单的数据库,并将该 SQL 语句执行,写入对应的账单内容。
4)外部直接调用函数
- 包装好发送账单的函数
CTKNOSService::RecordBill(...):
1
2
3
4
5
6
7
8
9
void CTKNOSService::RecordBill(...)
{
// 补齐账单所需的参数,例如 PID、BillType、ModifyType 等信息
//...
// 拼装发送消息所需的消息头 buf
//...
// 异步发送消息:
m_clsAsyncSendBill.AddMsg((PTKHEADER)buf.GetBufPtr(), TK_MSG_OTHER2CTR_COLLECTOR_PUSH_JSON_EX);
}
5)使用
- 使用的时候直接调用
CTKNOSService::RecordBill():
1
2
3
4
5
6
7
8
9
10
11
// 整理需要记录账单的信息:
Json::Value optionalJ;
optionalJ["dataid"] = static_cast<unsigned int>(m_id);
optionalJ["nv"] = static_cast<__int64>(balance);
optionalJ["mpid"] = static_cast<unsigned int>(dwMPID);
optionalJ["ts"] = static_cast<unsigned int>(dwTS);
optionalJ["ov"] = static_cast<__int64>(oldV);
optionalJ["note"] = static_cast<string>(note);
optionalJ["expireat"] = GetExpireatForBill();
// 通过 CTKNOSService 的单例调用函数:
g_tkNOSService->RecordBill(dwPID, m_billType, m_editMethod, time(NULL), val, optionalJ);
# CLS 条件日志
服务中通过代码提前埋好 stake(桩)。待需要 debug 的时候,在 CLS 相关的 mysql 中添加对该stake的监控,即可在对应的 mysql 中按照条件打好日志。
具体介绍和使用文档见参考2。
基本思路:通过bgs::CServiceCLS::GetInstance()获取单例,使用 CLS 条件日志。
使用思路
- 在
CTKNOSService::OnInitialUpdate()中初始化CServiceCLS的单例:
1
2
3
4
5
6
7
8
9
10
// 初始化`CServiceCLS`的单例:
if (!bgs::CServiceCLS::GetInstance()->Init(...))
{
KWriteLog("<ERR> 条件日志订阅初始化失败");
} else {
TKWriteLog("条件日志订阅初始化成功");
}
// stake 桩位初始化:
InitCLSStake();
- stake 桩位初始化函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
// stake 桩位初始化:
void CTKNOSService::InitCLSStake();
{
bgs::CServiceCLS::GetInstance()->TKStakeInit("RecordBill", "记录账单", 8,
"UserID", "用户ID",
"EID", "积分ID",
"MPID", "比赛ID",
"BillType", "账单类型",
"TS", "时间",
"Value", "修改值",
"Balance", "修改后积分值",
"OldValue", "修改前积分值");
}
- 在需要使用的地方进行调用:
1
2
3
4
5
6
7
8
9
bgs::CServiceCLS::GetInstance()->TKWriteLogRemote(dwPID, 0, 0, 0, "RecordBill", 8,
"UserID", bgs::VARTYPE_LONG, dwPID,
"EID", bgs::VARTYPE_LONG, m_id,
"MPID", bgs::VARTYPE_LONG, dwMPID,
"BillType", bgs::VARTYPE_INT, m_billType,
"TS", bgs::VARTYPE_LONG, dwTS,
"Value", bgs::VARTYPE_LONG, val,
"Balance", bgs::VARTYPE_LONG, balance,
"OldValue", bgs::VARTYPE_LONG, oldV);
ServiceCLS 类内处理思路
通过 CServiceCLS::TKWriteLogRemote() 传入消息后:
- 将该消息整理加工,添加 userID,IP 等信息;
- 将该消息
push到一个std::queue中; - 在异步函数中加锁来控制发送消息的时机,从该队列中取消息;
- 在
CServiceCLS::SendCLSLog()中调用 Service 连接池的CTKConnectionPool::SendMsg()来发送消息。
系统时间偏移量
TKNOSService 中还维护了一个 CTKConnectionPool 对象 m_clsGetTimeCoonPool(连接 CLS 的连接池)。
TKNOSService 系统会在CTKNOSService::OnInitialUpdate()中调用CTKNOSService::UpdataSystermTimeStampOffset(),再在其中调用 CTKNOSService::GetTimeFromCLS(...),最后通过 m_clsGetTimeCoonPool.SendMsg(...)使用协议头 TK_MSG_OTHER2CLS_GETTIME 发送消息获取时间并返回。
最后与当前时间做差,赋值给m_systermTimeStampOffset:系统时间偏移量,作为系统的一项配置被不断更新。采用CLS中心化节点方式实现系统时间校准。
# DHL
为了实现数据驱动,要大范围的收集数据。在收集数据的时候,有侵入式的采集方法,比如 DTCSDK,这种方法由于耦合度高,并且稳定性受到接入方的怀疑,所以推广不开;还有 日志采集 的方式,这种方式因为其非侵入性,业务方更容易接受。老版本的 DHL 虽然可以采集数据,但是其可靠性一直受到质疑,数据丢失的错误也难以排查定位,并且随着对 数据实时性 的需求增加,老版本的 DHL 也亟需升级。
在此基础上,新的 DHL 应运而生。3
DHL 服务订阅了两个 CVS 的配置表:define_dhl 和 define_dtccollect_node。DHL 从 define_dhl 中拉取配置规则;从 define_dtccollect_node 中拉取 dtcCollector 节点的信息。
-
define_dhl :
哪些机器拉取这条配置;采集到的数据发往哪一个集群(集群配置在define_dtccollect_node表中);过期时间。
# DTC
目的
dtcCollector主要是将不同数据分发到不同的kafka topic。为了提升整个数据链路上的数据质量,dtcCollector新增了诸多可靠性保障机制 。4
dtcCollector 服务在整个数据流转中的位置
在数据流转过程中,原始数据会被存放到kafka。Kafka由producer、broker、consumer三部分组成。其中,DTCCollector是producer,我们申请的kafka集群是broker,DTCIOsream等其他消费端是consumer。4
DTCCollector负责将数据发送到kafka对应的topic中。数据是由采集系统传到DTCCollector的,采集系统包括DHL、DTCSDK以及通过http采集过来的数据(之后会改为http将数据落文件,然后由DHL采集)。
DTC 与 DHL 的关系
采集者在生产者机器上部署日志采集服务(DHL),将日志采集、上传到数据流转中心(DTC);DTC将数据存储到存储介质(kafka)。5
# Redis
Hash 数据类型的常用命令
CRedisAutoPool 基本使用
基本思路:
0)基本信息
程序中是通过CRedisAutoPool 的单例来使用Redis连接池的。
使用时通过CRedisAutoPool::GetInstance()获取单例,进行初始化、定时更新、读写Redis数据。
CRedisAutoPool中维护着一个vector,用于记录所有的Redis集群信息:
1
std::vector<CDBConnManager *> m_RedisPoolList;
其中,每个CDBConnManager对应一个集群,该类的具体介绍见本文下一章。
m_RedisPoolList的信息读取自配置文件ClusterDefine.json。
1)初始化 OnInit()
在 CTKNOSService::OnInitialUpdate() 中对 Redis 连接池进行初始化:
1
2
3
4
5
if (!CRedisAutoPool::GetInstance()->OnInit()) {
return FALSE;
} else {
TKWriteLog("连接池初始化完毕!");
}
CRedisAutoPool::OnInit() 中代码的基本逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool CRedisAutoPool::OnInit()
{
//...
// 初始化两个CMA监控服务
//...
// 获取 NOS\ClusterDefine.json 的路径,并读取该json文件
//...
// 对于多个Cluster,
for (Json::ArrayIndex i(0); i < root.size(); ++i)
{
//...
// 创建一个 CDBConnManager 的对象用于维护该 cluster 的连接
CDBConnManager * dbItem = new CDBConnManager;
ret = dbItem->InitObj(root[i], m_RedisPoolListLen, m_strConsulServer);
if (!ret) {
delete dbItem;
break;
}
m_RedisPoolList.at(m_RedisPoolListLen) = dbItem;
//...
}
return ret;
}
函数
CDBConnManager::InitObj(...)的使用方法见 初始化 InitObj(…)
2)更新 OnUpdate()
在 CTKNOSService::OnTimeout() 中对 Redis 连接池信息进行更新:
1
CRedisAutoPool::GetInstance()->OnUpdate();
CRedisAutoPool::OnUpdate() 中代码的基本逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool CRedisAutoPool::OnUpdate()
{
// CMA 在本地日志打印状态并向TKCMAAgentService发送状态
//..
// 读取最新的 ClusterDefine.json 文件
// 遍历json中所有clu+ster:
for (Json::ArrayIndex i(0); i < root.size(); ++i)
{
// 读取当前cluster的各字段
//...
// 判断当前cluster是否为Redis池当前已经记录在档的某一个
if (isOldCluster) {
// 通过调用 CDBConnManager::updataInfo() 更新此cluster的各字段
} else {
// new 一个新的 CDBConnManager 塞到 m_RedisPoolList 中
}
}
// 如果上面没出问题的话,调用函数 Dump()
// Dump() 就是再把上面更新后的所有cluster信息写回到json中
if (ret) {
Dump();
}
return ret;
}
这里产生了一些疑问:
- 若 ClusterDefine.json 中删掉某一个 cluster 信息的话,m_RedisPoolList 中就会存在一个失效的 cluster 信息。这里不需要对失效的 cluster 信息进行处理(删除)吗?
Dump()有什么意义?上面刚读了 json 文件,转头就再写回去?无非就是多了一些老的 cluster 信息。
3)数据转存至 SSDB
-
定义了一个
CTKAsyncSendObject<CTKNOSService>类型的异步对象m_clsAsyncSend2SSDB,用于将数据降级至 SSDB 队列。 -
在
CTKNOSService::OnInitialUpdate()中初始化,将函数
CTKNOSService::OnAsyncDataDemition()托管给该对象:
1
2
3
4
5
6
7
8
9
10
11
BOOL CTKNOSService::OnInitialUpdate()
{
//...
if (!this->m_clsAsyncSend2SSDB.Init(this, &CTKNOSService::OnAsyncDataDemition, 20)) {
TKWriteLog("< ERR > 数据降级至SSDB数据库异步发送对象初始化失败!");
if (m_DataDemitionSwitch > 0) {
return FALSE;
}
}
//...
}
- 在函数
CTKNOSService::OnAsyncDataDemition()中,将数据转存到 SSDB:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
BOOL CTKNOSService::OnAsyncDataDemition(PTKHEADER pMsg, DWORD dwParam, BOOL bWaitAck)
{
//...
// 按照既定格式解析pMsg,从中获取PID、EID、Key、fields等必要信息
//...
// 从集群配置信息中读取 "ssdb_evt_Delay" 集群的信息
//...
// 获取当前积分所在集群的 baseclusterID :
unsigned baseclusterID = CRedisAutoPool::GetInstance()->GetClusterIDWithPid(pBase->GetClusterName(), dwPid);
// 获取当前积分的 val :
vector<DWORD> val;
CRedisAutoPool::GetMultiHashFieldValue(baseclusterID, key, vecFields, val, 0);
// 将积分写入 ssdb 中,并原来的 redis 集群中删除:
if (IsOK(CRedisAutoPool::SetMultiHashFieldValues(ssdbCluterID, key, vecFields, val))) {
CRedisAutoPool::HDelHashField(clusterID, key, vecFields);
}
//...
}
4)Redis 数据的查询、修改、删除
-
CRedisAutoPool 中提供了很多静态函数用于 Redis 数据的查询、修改、删除等操作。
-
基本思路:获取当前积分的数据结构,攒出来一个 Redis 命令字符串,执行该 Redis 命令。
-
查询函数。相关的几个函数的基本逻辑都是一样的,以
GetMultiHashFieldValue()为例介绍代码逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
unsigned int GetMultiHashFieldValue(const unsigned int id, const string & key, const vector<string> & fields, vector<unsigned long> & val, int nullVal)
{
//...
// 校验入参key、fields是否有值
//..
// 攒出来一个redis执行语句 oss :
ostringstream oss;
oss << "HMGET " << key;
for (auto citer = fields.begin(); fields.end() != citer; ++citer) {
oss << " " << *citer;
}
// 通过函数 CDBConnManager::ExecRedisCommand() 执行该redis语句:
redisReply * reply = NULL;
reply = static_cast<redisReply *>(
pConnPool->m_RedisPoolList[id]->ExecRedisCommand(oss.str().c_str()));
// 解析返回的 reply ,提取出 int 并记录在出参 val 中。
//...
// 记录CLS条件日志
//...
// 记录CMA信息
//...
return ec;
}
- 修改、删除函数与上面的大体一致,后续详细看了以后再在这里补充吧。
CDBConnManager 基本使用
0)基本信息
上面的CRedisAutoPool就是在维护一个 std::vector<CDBConnManager *>的对象 m_RedisPoolList,以此来维护所有的 Redis 集群的连接。
每个 CDBConnManager 对象,就是对应一个 Redis 集群,每个集群均对应 ClusterDefine.json中的一个字段,形如:
1
2
3
4
5
6
7
8
9
{
"consulserverargv" : "passing=true",
"consulserverpath" : "/v1/health/service/evt-004-redis",
"ipport" : "10.22.120.134:9390,10.22.120.135:9390,10.22.120.136:9390,10.22.120.137:9390,10.22.120.140:9390,10.22.120.141:9390",
"isNewCluster" : true,
"name" : "redis_evt_A",
"pwd" : "redis_EVT@JJMatch",
"type" : "redis"
}
CDBConnManager类内有一个成员变量对象,该对象记录着此集群下所有的ip:
1
std::vector<REDISCONNQUEUEITEM> m_vcIPConn;// ip队列
1)初始化 InitObj(…)
入参const Json::Value & root 是从 ClusterDefine.json 文件中读取的,root 的内容示例:
1
2
3
4
5
6
7
8
9
{
"consulserverargv" : "passing=true",
"consulserverpath" : "/v1/health/service/evtssdb-001-ssdb",
"ipport" : "10.30.142.10:8387,10.30.142.11:8387,10.30.142.12:8387",
"isNewCluster" : false,
"name" : "ssdb_evt_B",
"pwd" : "rediscluster_statevt_8888@JJMatch",
"type" : "redis"
}
CDBConnManager::InitObj(...) 代码思路:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 关键: 避免线程冲突、避免死锁
bool CDBConnManager::InitObj(const Json::Value & root, const long & id, const string & consulserver)
{
// 从 TKService.ini 中获取一些参数
//...
// 从 root 中获取 cluster 的信息,例如 name、pwd、ipport 等
//...
// 通过参数初始化连接
if (!initRedisPool(/*...*/)) {
TKWriteLog("<ERR> clusterdefine 初始化失败");
return false;
}
//...
return true;
}
CDBConnManager::initRedisPool(...) 代码思路:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bool CDBConnManager::initRedisPool(
const string & name,
const string & ipport,
const string & pwd,
const string & consulserver,
const string & consulserverpath,
const string & consulserverargv)
{
//...
// 尝试切换到 consulserver(容灾中心)提供的 ip 上:
if (!consulserver.empty()) {
//...
for (...) {
//...
if (ret = GetSwitchIP()) {
break;
}
}
}
// 如果上面 consulserver 没有正确建立连接,就尝试从入参 ipport 建立连接
if (!ret) {
//...
}
return ret;
}
2)ip切换 GetSwitchIP( )
CDBConnManager::GetSwitchIP() 用于切换当前的 ip 到 consulserver(容灾中心)配置的 ip 上:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
bool CDBConnManager::GetSwitchIP()
{
// HTTP 连接 consulserver 获取consul配置信息:
CHTTPConnector httpSender;
int httpret = httpSender.HTTPGetMsg(...);
//...
// 统计当前所有已连接ip中,存活IP数
//...
// 从consul配置信息中获取可用的 ip + port :
vector<string> allowedIps; // 可用ip
vector<int> allowedPorts; // 可用port
//...
// 遍历所有consul的ip连接:
// (非全量更新,创建或重连新连接)
for (int i = 0; i < allowedIps.size(); i++) {
// 检查此"可用ip"是否在consul的ip中
//...
// 若是新增的ip(相对目前已连接的ip):
// 新建一个 REDISCONNQUEUEITEM 连接塞到 m_vcIPConn 里;
REDISCONNQUEUEITEM * connItem = &m_vcIPConn.at(m_ipCount);
// ...
// 若是旧的ip:
// 若该旧的连接不"存活"(0 == m_vcIPConn[index].alive):
// 尝试重新连接:
TryReConnect(m_vcIPConn[index])
}
// 若是全量更新,就把新配置中没有的老ip灭掉、该ip下的redis连接释放:
if (CRedisAutoPool::GetInstance()->IsFullUpdate()) {
// 遍历目前所有已连接ip:
for (int j = 0; j < m_ipCount; j++) {
// 判断该已连接ip是否在consul的ip中,若不在的话:
// 判断ip是否存活。若该ip存活的话,灭掉:
if (m_vcIPConn[j].alive != 0) {
::InterlockedExchange(&m_vcIPConn[j].alive, 0);
// 将该ip下的redis连接都删掉:
size_t sizeQueue = m_vcIPConn[j].connitem.size();
if (sizeQueue > 0) {
while (!m_vcIPConn[j].connitem.empty()) {
auto iter = m_vcIPConn[j].connitem.front();
m_vcIPConn[j].connitem.pop();
ReleaseConnection(iter, true);
}
}
}
}
}
// 统计现在存活的ip个数
// 输出到log日志中
return ret;
}
3)更新 updataInfo(…)
此函数会在 CRedisAutoPool 中的 OnUpdate() 中被调用,用于更新集群信息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
bool CDBConnManager::updataInfo(
const string & ipport,
const string & consulserver,
const string & consulserverpath,
const string & consulserverargv,
const string & pwd)
{
// 从 INI 文件中获取 "错误链接阈值" 等配置信息
//...
if (!consulserver.empty()) {
// 根据入参 consulserver(容灾中心)拆分出容灾中心的ip、port等信息
return TryGetSwitch();
} else {
// 容灾中心没有消息,使用本地配置加载集群
// 从入参ipport中拆分出配置中的ip和port信息,存为两个vector:ipvec和portvec
// 循环所有的目标ip
for (int iter = 0; ipvec.size() != iter; ++iter) {
if (!IsOldIp(ipvec[iter])) { // 当前连接池中没有该目标ip
// 创建一个新的Redis连接,并填充所有信息:
REDISCONNQUEUEITEM * connItem = &m_vcIPConn.at(m_ipCount);
connItem->ver = 0;
connItem->alive = 1;
connItem->errNum = 0;
//...
// 循环创建5个相同的RedisConn填到上面的connItem中专门用来记录redis连接的队列中:
for (int i = 0; i < REDIS_CONN_NUM_ON_BEGIN; i++) {
//...
}
// 检查创建的redis连接数量是否小于3(小于3就报错)
// ip数量+1:
::InterlockedIncrement(&m_ipCount);
}
else { // 当前连接池中已存在该ip
// 先找到当前redis连接在连接池中的索引,保存为index
// 若该连接不是存活状态,重新连接:
if (/*...*/&& 0 == m_vcIPConn[index].alive) {
TryReConnect(m_vcIPConn[index]);
}
}
}
}
return ret;
}
4)尝试ip切换 TryGetSwitch( )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bool CDBConnManager::TryGetSwitch()
{
bool ret = true;
time_t tsNow = time(NULL);
// 判断当前时间是否已经大于上次更新时间
if (tsNow - m_lastReConnTime > 1) {
if (::TryEnterCriticalSection(&m_csBingSwitchSection)) {
//TKWriteLogFile("DBConnLogFile.log", "当前线程抢占到更新锁!!!");
ret = GetSwitchIP();
::LeaveCriticalSection(&m_csBingSwitchSection);
//TKWriteLogFile("DBConnLogFile.log", "当前线程离开更新锁!!!");
} else {
//TKWriteLog("<Info> 当前占用BingSwitch锁的线程为: %d", m_csBingSwitchSection.OwningThread);
}
}
if (!ret) {
CRedisAutoPool::IncState("SwitchErr");
}
return ret;
}
# CVS 配置管理类
为保证服务
1. CTKTwoBuf
双缓冲对象。
该类维护了两套来自 CVS 的配置,用于读取新的 CVS 配置,以及瞬时切换当前正在使用的配置。
该类用于 TKNOSService 服务中。
1)类定义
该类的定义位于ShareCode 中的 TKTwoBuf.h 中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// TYPE:CVS中的配置格式,NOS中为 CConfigMap
// TYPE_SVR:当前是为哪个服务配置的
template <class TYPE, class TYPE_SVR>
class CTKTwoBuf
{
// 定义一个函数指针,该函数用于更新配置
typedef BOOL(TYPE_SVR::*TKUPDATEDICFUNC)(const std::shared_ptr<TYPE> & _pOld, std::shared_ptr<TYPE> & _pNew);
public:
CTKTwoBuf(void);
virtual ~CTKTwoBuf(void);
// 给 m_pOwner 和 m_UpdateDicFunc 赋值,并调用 UpdateDic(1):
BOOL Init(TYPE_SVR * _pOwner, TKUPDATEDICFUNC _pUpdateDicFun);
void GetData(std::shared_ptr<TYPE> & ptr); //取实际对象
BOOL UpdateDic(int _nVer); // 更新字典表
// 这里会调用回调函数来更新配置,没问题的话修改 m_nCurOEElement
int GetCurVer() { return m_nDicVer; }; //取当前版本号
private:
int m_nDicVer; // 版本号,Init()的时候赋值为1,后续为时间戳
TYPE_SVR * m_pOwner; // 当前是为哪个服务配置的
int m_nCurOEElement; // 只有0和1两个取值,对应m_Data中的两套
TKUPDATEDICFUNC m_UpdateDicFunc; //回调函数指针:用于更新配置
// NOS中的函数为CTKNOSService::OnEvtConfigUpdate()
time_t m_tUpdateTime; //上一次更新时间
std::shared_ptr<TYPE> m_Data[2]; // 两套 CVS 配置
};
2)配置模板:CConfigMap
这个类用于存储 NOS 服务从 CVS 中拉取的配置信息。在 NOSService 中作为 CTKTwoBuf 中的 TYPE 传入。
2. CVSAgent
为 CTKTwoBuf 类的 pro 版本,同样也是一个双缓冲对象。
该类维护了两套来自 CVS 的配置,用于读取新的 CVS 配置,以及瞬时切换当前正在使用的配置。
该类用于 ESP、OSS 中。
1)CVSAgent 类定义
- 该类的定义位于 CVSClient.h 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 用户配置类代理,CVSSDK对应的实体。融合twobuf
// 一个代理对应一种配置对象,对应一个twobuf,对应多个更新方法
template <class TYPE>
class CVSAgent
{
public:
typedef bool(*TKUPDATEDICFUNC)(const std::shared_ptr<TYPE> & _pOld, const std::string & configs, std::shared_ptr<TYPE> & _pNew, std::string & feedback, bool bMakeShare);
public:
CVSAgent() : m_nDicVer(0), m_nCurOEElement(0) {...};
~CVSAgent() { };
// 如果不可以在构造函数中向ZK注册回调,那么在此注册
// @param:identifier 节点标识,请使用IP:PORT的字符串进行标识
// @param:business 业务名,英文字母描述业务缩写如EVT,CLS
bool Init(std::string identifier, std::string business) {...};
// 添加初始化方式。依照添加的顺序筛选配置、调用加载
bool AddUpdateFunc(std::string cfgtype, TKUPDATEDICFUNC callback) {...}
// 2018-12-4 lixm避免通过返回值返回临时变量,实测会被VS优化,并不增加引用计数
bool Update();
// 取正在生效的配置对象
void Get(std::shared_ptr<TYPE> & ptr) { ptr = m_Data[m_nCurOEElement ? 1 : 0]; };
// 辅助APIs
public:
int GetLastUpdateTime() { return m_nDicVer; };
private:
// 从ZK获取对端CVS的地址信息。如果未成功连接ZK,从本地配置获取
bool InitCVSConnPool() {...}
// 根据节点信息从远程CVS获取指定类型的全量配置
bool GetTypeOnNodeIdent(const std::string & cfgtype, std::string & allCfgs);
// 从本地备份文件获取指定类型的全量配置
bool GetTypeFromLocalBackup(const std::string & cfgtype, std::string & allCfgs) {...}
// 从远程CVS获取并加载成功后,写入本地备份文件
bool WriteTypetoLocalBackup(const std::string & cfgtype, const std::string & allCfgs);
// 反馈对获取的配置项的加载结果
bool FeedbackLoadStatus(const std::string & cfgtype, const std::string & feedback);
// 成员
private:
std::string m_strWorkPath; // 当前服务的运行路径
std::vector<TypeUpdater<TYPE> > descriptors; // 对多种类型的对应更新方式
static std::string entident; // 该实体的标识,用于决定该获取哪种配置
static std::string m_port; // 宿主服务端口号
int m_nDicVer; // 上次切换buf的时间
int m_nCurOEElement; // 当前使用哪一个buf
std::shared_ptr<TYPE> m_Data[2]; // 实际配置对象。不能显式初始化,因此该模板只接受有无参构造函数的对象
// 资源
CTKConnectionPool m_connpoolCVS; // 到CVS服务的连接池
static CVSAgent<TYPE> * m_pthis; // 强制单例模式。即对一种配置对象,只有一个CVSClient
// 这个指针会在初始化的时候赋值
std::string m_cvsloadsrvfail;
std::string m_cvsloadlocalfail;
std::string m_cvssrvfail;
std::string m_cvslocalfail;
bgs::CServiceStat m_stat; // CMA监控项,上面4个string是初始化的监控字段
};
-
初始化函数
使用的时候,构造对象之后,需要立刻调用
Init()来初始化对象。
1
Init()
- 更新配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template <typename TYPE>
bool CVSAgent<TYPE>::Update()
{
//...
// 从ZK获取对端CVS的地址信息。如果未成功连接ZK,从本地配置获取
InitCVSConnPool(); // 这里会初始化成员变量 m_connpoolCVS
//...
// 遍历所有类型的更新方法:
typename std::vector<TypeUpdater<TYPE> >::iterator iter = descriptors.begin();
for (; iter != descriptors.end() && !ifanyerr; ++iter) {
// 先尝试远程加载:向CVS发消息,
// 根据 descriptors 中保存的string字段从CVS 中获取对应的所有配置,输出为 filtrated:
GetTypeOnNodeIdent(iter->m_cfgtype, filtrated);
// 调用descriptors中记录的函数,解析 filtrated 为json,记录到m_Data[]的另一个里
//...
// 落盘到文件
WriteTypetoLocalBackup(iter->m_cfgtype, filtrated);
// 如果上述远程加载没有成功的话,从备份文件加载配置,输出为 filtrated:
GetTypeFromLocalBackup(iter->m_cfgtype, filtrated);
// 调用descriptors中记录的函数,解析 filtrated 为json,记录到m_Data[]的另一个里
//...
// 反馈对获取的配置项的加载结果:
FeedbackLoadStatus(iter->m_cfgtype, feedback);
}
//...
}
2)TypeUpdater 类定义
该类负责记录一对儿数据,包含一个 string 和一个 TKUPDATEDICFUNC 类型,以方便 CVSAgent 中通过 std::vector 记录所有的键值对。
这个类型做的事情完全可以通过一个 map 来解决,猜测可能是由于当时的 STL 中还没提供成熟的 std::map 而采用的权衡之法。
类定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template < class TYPE>
class TypeUpdater
{
public:
typedef bool(*TKUPDATEDICFUNC)(const std::shared_ptr<TYPE> & _pOld,
const std::string & configs,
std::shared_ptr<TYPE> & _pNew,
std::string & feedback,
bool bMakeShare);
TypeUpdater(const std::string & cfgtype,
TKUPDATEDICFUNC updater): m_cfgtype(cfgtype), m_updater(updater) {};
bool ifAdded(const std::string & cfgtype,
TKUPDATEDICFUNC updater){
return m_cfgtype == cfgtype && m_updater == updater;
};
std::string m_cfgtype;
TKUPDATEDICFUNC m_updater;
};
3)TKESPService 中的使用
- 类声明中定义一个
CVSAgent<CTKESPConfigMap>的对象:
1
2
3
4
5
6
7
class CTKESPService : public CTKWinObjNetService
{
//...
private:
CVSAgent<CTKESPConfigMap> m_config;
//...
};
- 在
CTKESPService::OnInitialUpdate()中初始化该对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
BOOL CTKESPService::OnInitialUpdate()
{
//...
// 取得内网ip的字符串
string strIPPort = cipinter;
if (!m_config.Init(strIPPort, "ESP")) {
TKWriteLog("<ERR> 初始化CVS错误!");
return FALSE;
}
if (!m_config.AddUpdateFunc("define_esprule", &InitCLSConfig)) {
TKWriteLog("<ERR> CVS AddUpdateFunc 错误!");
return FALSE;
}
if (!m_config.Update()) {
TKWriteLog("<ERR> CVS 初始化,首次Updata失败!");
return FALSE;
}
//...
}
- 定义
InitCLSConfig(...)函数,需要注意的是,该函数形式需要与CVSAgent中定义的函数指针TKUPDATEDICFUNC的形式保持一致。
1
2
3
4
5
typedef bool(*TKUPDATEDICFUNC)(const std::shared_ptr<TYPE> & _pOld,
const std::string & configs,
std::shared_ptr<TYPE> & _pNew,
std::string & feedback,
bool bMakeShare);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool InitCLSConfig(const std::shared_ptr<CTKESPConfigMap> & _CurConfig,
const string & newconfig,
std::shared_ptr<CTKESPConfigMap> & _NewConfig,
string & feedback,
bool bMakeShare)
{
//...
// 从入参newconfig中解析配置项的json字段为 definesJ
//...
// 是否转为shared类型的智能指针:
if (bMakeShare) {
_NewConfig = std::make_shared<CTKESPConfigMap>();
}
// 加载新配置:
_NewConfig->LoadDefines(definesJ, feedback);
//...
return TRUE;
}
- 定时更新配置:
1
2
3
4
5
6
7
8
9
10
11
BOOL CTKESPService::OnTimeout(struct tm * pTime)
{
//...
int nDicUpdateTime = TKGetIniInt("Config", "DicUpdateTime", 1);
if (nDicUpdateTime == 1) {
m_config.Update();
} else {
TKWriteLog("<Warning> 配置更新开关DicUpdateTime已关闭!");
}
//...
}
-
配置的使用:
在需要读取配置的地方,调用
Get()函数,该函数会返回一个std::shared_ptr<CTKESPConfigMap>类型的对象,然后进行使用即可。例如在
CTKESPService::ProcessSourceData(...)中需要使用配置时:1 2 3 4 5 6 7 8
DWORD CTKESPService::ProcessSourceData(...) { //... std::shared_ptr<CTKESPConfigMap> ptr; m_config.Get(ptr); SID2PROCESSESPRO::iterator sid2processpro = ptr->m_sid2ProcessPro.find(dwSID); //... }
4)配置模板:CTKESPConfigMap
CTKESPConfigMap 会作为CVSAgent<TYPE> 中的类型传入。
这个类用于存储 ESP 服务从 CVS 中拉取的配置信息。
- 类声明位于 TKESPConfigMap.h 文件中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typedef map<unsigned int, map<RULELEVELDIC, vector<shared_ptr<CTKESPProcess>>>> SID2PROCESSESPRO;
typedef map<string, set<unsigned int>> USERLISTNAME2LIST;
class CTKESPConfigMap
{
public:
CTKESPConfigMap();
~CTKESPConfigMap();
bool Clear();
bool LoadDefines(const Json::Value & definesJ, string & feedback);
bool IsInList(const string & strListName, const DWORD & dwPID);
// map:左值为白名单配置名,右值为userid的列表
USERLISTNAME2LIST m_mpUserNameList;
// map:左值为sid,右值为map(左值为ESPrule等级,右值为一个vector,为该等级下所有的ESP规则)
SID2PROCESSESPRO m_sid2ProcessPro;
};
- 解析 json 配置:
LoadDefines(...)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
bool CTKESPConfigMap::LoadDefines(const Json::Value & definesJ, string & feedback)
{
//...
// 遍历入参的所有json配置:
for (Json::ArrayIndex i(0); i < definesJ.size(); ++i) {
// 创建一个 CTKESPProcess 的对象:
shared_ptr<CTKESPProcess> pProcess = make_shared<CTKESPProcess>();
if (pProcess == NULL) {
//...
return false;
}
// 获取当前json配置中各字段的内容
JsonGetUInt(definesJ[i], endtime, "active_endtime");
//...
JsonGetUInt(definesJ[i], espid, "id");
//...
JsonGetUInt(definesJ[i], sid, "sid");
//...
JsonGetString(definesJ[i], content, "content");
//...
Json::Value contentJ(Json::arrayValue); // 接收获得的配置项contentJ
jreader.parse(content, contentJ, false);
// 从 contentJ 中提取用户白名单:
//...
m_mpUserNameList[strUserListname].insert(pidtemp);
//...
// 根据 contentJ 记录的信息对 pProcess 初始化:
ESPRESULT initret = pProcess->init(espid, sid, contentJ);
// 若成功的话,将 pProcess 记录为成员变量
if (initret == SD_EVT_PROCRESULT_OK) {
resultJ["configtype"] = definesJ[i]["configtype"];
resultJ["id"] = definesJ[i]["id"];
resultJ["last_modify_time"] = definesJ[i]["last_modify_time"];
resultJ["result"] = true;
resultJ["feedback"] = "ok";
m_sid2ProcessPro[sid][pProcess->rulelevel()].push_back(pProcess);
}
}
//...
return ret;
}
5)配置模板:CCLSConfigMap
CCLSConfigMap 会作为CVSAgent<TYPE> 中的类型传入。
这个类用于存储 CLS 服务从 CVS 中拉取的配置信息。
CCLSConfigMap的类声明与 CLS 相关的代码放在一起,都位于 ServiceCLS.h 文件中。
5)配置模板:CTKDTCCollectConfig
CTKDTCCollectConfig 会作为CVSAgent<TYPE> 中的类型传入。
这个类用于存储 DTC 服务从 CVS 中拉取的配置信息。
CTKDTCCollectConfig的类声明与 DTC 相关的代码放在一起,都位于 TKDtcCollectClient.h 文件中。
# 关于 ESP
0. 基本流程
OnInitialUpdate()的时候,依次调用m_config.Init(...)、m_config.AddUpdateFunc(...)、m_config.Update()加载 CVS 配置,具体加载内容和顺序见上面 CVSAgent 中的介绍。
1. CTKESPProcess
这个类用来维护一条 ESP 规则,也就是在 define_esprule_ 表中,通过 “content” 字段中记录的 json 解析出来的一条规则。
里面记录了当前规则中所有的控件。
- 类定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class CTKESPProcess
{
public:
CTKESPProcess();
~CTKESPProcess();
// 初始化,这里会传入必须的参数,然后把所有的控件按照json记录的内容new出来。
ESPRESULT init(unsigned int espid,
unsigned int sid,
const Json::Value & config);
// 执行:根据当前类的对象记录的流程,再补充一些参数,然后开始执行
ESPRESULT handle(long long llKeyWord[],
Json::Value & sourceDataJ,
unsigned int & needClean,
unsigned int & subErrCode) const;
inline unsigned int espid() const { return m_espid; };
inline RULELEVELDIC rulelevel() const { return m_emRuleLevel; }
private:
unsigned int m_espid;
unsigned int m_sid;
unsigned int m_version;
RULELEVELDIC m_emRuleLevel;
// 正常为0,最低位为1代表需要清空Json,第2低位为1代表需要清空KewWord
unsigned int m_needClean;
// 所有的控件:
vector<std::shared_ptr<CTKESPModuleBase>> m_moduleList;
};
# 消息恢复机制
消息恢复机制涉及到两个类CTKMsgRecover<T>和CTKAsyncSendObject<T>。
CTKMsgRecover<T>:异步消息恢复CTKAsyncSendObject<T>:异步消息恢复对象
当向下游发送消息失败时(可能是由于下游宕机或其它原因),调用类 CTKMsgRecover 创建的对象,记录该消息,CTKMsgRecover 类的对象会自动在每分钟调用的 OnTimesout() 函数中尝试将该消息发送出去。
1)使用方法
以 OSS 中的使用为例:
- 定义:
1
2
3
4
// 异步消息恢复<recover>
CTKMsgRecover<CTKObjectStorageService> m_msgRecover;
//异步消息恢复对象
CTKAsyncSendObject<CTKObjectStorageService> m_clsAsyncSaveObj;
- 初始化:
1
2
3
4
5
6
7
8
9
10
//5. 消息恢复初始化
{
//定义可恢复异步消息<recover>
m_msgRecover.AddRecMsgType(1, "OSSSrv", "object", &m_clsAsyncSaveObj);
//定义消息恢复异步对象
if (!m_clsAsyncSaveObj.Init(this, &CTKObjectStorageService::OnAsyncObjRecover, 2)){
TKWriteLog("%s:%d,m_clsAsyncSaveHistory异步对象初始化失败!");
return FALSE;
}
}
- 消息发送失败,记录需要恢复的消息:
1
2
3
4
5
6
7
8
9
// 先攒出来一个用于消息恢复的结构体:
RECOVEROBJECTREQ recStruct = {0};
memset(&recStruct, 0, sizeof(RECOVEROBJECTREQ));
recStruct.header.dwLength = size - sizeof(TKHEADER);
recStruct.oid = oid;
//...
// 记录该消息
this->m_msgRecover.RecordMsg(1, (char *)buff.GetBufPtr(), size);
- 定时恢复:
1
2
3
4
5
6
// CMA 监控消息恢复情况:
this->opsstatDynamic_.Set("objrecover", this->m_msgRecover.GetRecoverMsgCnt(1));
this->opsstatDynamic_.Set("objrecord", this->m_msgRecover.GetRecordMsgCnt(1));
this->opsstatDynamic_.Set("objrecfail", this->m_msgRecover.GetRecoverFailCnt(1));
// 定时消息恢复:
this->m_msgRecover.OnTimesout(pTime);
- <占位符> 占位符>
2)CTKMsgRecover<T>
3)CTKAsyncSendObject<T>
# CVS
CVS 获取配置
以下梳理当 CVS 服务收到 TK_MSG_OTHER2CVS_GET_CERTAIN_TYPE 协议,获取配置的逻辑:
-
CVS 从协议头中提取所需信息:
- 发送消息的节点 ip 信息(nodeident)。例1:
192.168.17.147:37010; - cfgtype :例如
his_oss。
- 发送消息的节点 ip 信息(nodeident)。例1:
-
注册节点并获取控制信息:(函数
RegistorUpdateNode(...))-
根据给定节点套接字,获取区间套接字字段(sectionident)。例1:
192.168.17.*:37010; -
利用下面的 SQL 语句从表
map_node2cluster中获取所需的配置信息;1
select node_addr,cluster,riskcontrol from CVS.map_node2cluster where node_addr = '[nodeident]' OR node_addr = '[sectionident]';
-
将上述语句查找到的配置信息写为一个
Json::Value并返回; -
更新查找到的数据中的
LastReportDate,写入当前时间。
-
-
从上面返回的
Json::Value中提取字段'cluster'的内容。例1:hisoss_neiwang; -
获取所有指定的配置项:(函数
GetAllMatchingConfigs(...))-
利用以下 SQL 语句从表
map_define2cluster中获取所需的配置信息(要查询的目标表);1
select DISTINCT(configs_tablename) from CVS.map_define2cluster where applyto_cluster = '[cluster]' or applyto_cluster = '' and live_tag = 1;
-
将上面查询到的内容保存为变量
matchtabsJ(Json::Value类型)。例1:
'configtablename': 'define_hisoss', 'define_hisoss_cluster' -
根据
matchtabsJ中记录的目标表,利用以下 SQL 语句从目标表中查询内容:1 2 3
select a.* from '[configtablename]' as a inner join (select id, max(last_modify_time) as max_last_modify_time from '[configtablename]' group by id) as b on a.id = b.id and a.last_modify_time = b.max_last_modify_time where a.status >= 0;
-
将上一步提取到的内容保存到入参
configsinmatchJ(Json::Value类型)中以返回。 -
例1:
{"configtype": ""}1 2 3 4 5 6 7 8 9 10 11 12
{ { "[从MySQL中获得的具体配置项]":"[从MySQL中获得的具体配置项]", "configtype": "define_hisoss", "last_modify_time": "2018-06-01 14:44:04" }, { "[从MySQL中获得的具体配置项]":"[从MySQL中获得的具体配置项]", "configtype": "define_hisoss_cluster", "last_modify_time": "2024-10-16 17:47:19" } }
-
-
【占位符】
