文章

一些代码逻辑

一些代码逻辑

[TOC]

# CMA 数据采集系统

1. 简介

CMA 针对服务监控,在服务代码中通过 sdk 打监控,最后通过可以通过监控页面查询,同时可以配置相应报警。

TKNOSService 中常用的是 DAT 和 CMT 两个集群。

1.1 实现类

监控指标采集API(StatAPI)为TK后台服务提供:易用的静(动)态指标采集接口与标准化的动态指标采集接口,分别由三个类实现:1

(1)bgs::CServiceStat:静态指标采集(无锁)

静态指标采集 API 由类 bgs::CServiceStat 支持,适用于指标名确定、指标数变动小的采集。

#####(2)bgs::CServiceStatDynamic:动态指标采集(有锁)

动态指标采集 API 由类 bgs::CServiceStatDynamic 支持,适用于指标名不定、指标数变动大的采集,例如不同事件 ID 的访问次数、不同奖品的兑换次数等。该类可以动态监控指标累积,指标名可以动态生成,不需要预先定义。

(3)bgs::CServiceStatNormalize:动态指标采集 _Tag 扩展(有锁)

标准化的动态指标采集 API 由类 bgs::CServiceStatNormalize 支持,适用于多维度指标统计、指标名不定、指标数变动大的采集,例如不同事件ID的访问次数、不同奖品的兑换次数等。该类可以动态监控指标累积,指标名可以动态生成,不需要预先定义。

1.2 提供的功能1

  1. 原子化的监控指标累积
  2. 监控指标存储空间管理
  3. 监控指标的格式化输出
  4. 监控指标向监控体系上报
  5. 服务状态信息(启动时间、编译时间、启动时长等)向监控体系上报

1.3 使用方式1

  1. 指标累积并输出到服务日志
  2. 指标累积并上报给监控体系
  3. 指标累积、输出到服务日志并上报给监控体系

2. 使用步骤

三种类的三种使用方式略有不同,但是整体思路基本都是一致的。

下面以 NOSService 为例,总结静态采集 bgs::CServiceStat 类的使用步骤:

2.0 全局 INI 配置

在全局 INI(TKService.ini)中配置 TKCMAAgentService 的连接方式:(若仅需 “指标累积并输出” 无需此步骤)

1
2
[Global]
TKCMAAgentService_SPIF=127.0.0.1:30900:2

2.1 定义成员变量

在服务类中定义 CServiceStat 成员。

1
2
3
4
5
6
7
#include <ServiceStat.h>
class CTKNOSService : public CTKWinObjNetService
{
public: //param
    bgs::CServiceStat m_serverStat_;            //  用于静态统计监控数据
};
    

2.2 初始化

(1)CServiceStat 初始化函数声明:
1
2
3
4
5
bool CServiceStat::Init(const string& statdefine,
          const string& statname,
          const string& section = "",
          const string& entry = "",
          const string& configfile = "");

参数解释:

  • statdefine:定义统计项输出的字符串。

    统计项与项之间以逗号分隔,字符串中只允许出现[A-Za-z,-./0-9:_]这些字符统计项字符串。

    基本格式: count1,count2,tick3/count3:avg_tick3:ms,count4,count5

    其中复合项模板: Dividend/Divisor:ItemName:Unit,表示:ItemName=Dividend/Divisor(Unit)

    样例:

    基本使用: count1,count2,count3,count4

    增加平均统计:count1,tick2:count2,count3,tick4/count4

    增加平均延迟/大小统计:count1,count3,tick4/count4:avg_tick:ms,size4/count4:avg_size:B

  • statname : 监控名,16位以下的字符串,标识监控数据的类型,要求在整个监控体系内唯一

  • section:配置段,默认为 Global

  • entry:配置项,与配置段一起,定义 TKCMAAgentService 连接串的配置位置,默认为TKCMAAgentService_SPIF

    为了向前兼容,在找不到 TKCMAAgentService_SPIF 的情况下,会继续寻找 TKLogAgentService_SPIF

  • configfile:配置文件路径,默认为全局INI,.\\Service.ini

需要注意的是:

  • 一定要在 TKService::OnInitUpdate 里面调用,以防止多线程对初始化的竞争

  • 第二个参数 statname 的长度不能超过15位(含)

  • 如果在配置文件中没有发现状态汇报服务器相关配置,状态信息将强制输出到日志文件中

(2)TKNOSService 中执行初始化:

CTKHisService::OnInitialUpdate() 中通过监控定义串初始化 CServiceStat 类:

1
2
3
4
5
6
7
8
9
//初始化监控
if (!this->m_serverStat_.Init("SEHExcept,UpdTick/UpdC,UpdZsetTick/UpdZsetC,MUpdTick/MUpdC,"
                     "QueTick/QueC,QueBitTick/QueBitC,MQueTick/MQueC,UdpHDELTick/UdpHDELC,"
                  "AgentUpdTick/AgentUpdC,AgentQueTick/AgentQueC,AgentMQueTick/AgentMQueC,"
                   "AgentUpdByTagTick/AgentUpdByTagC,AgentQueryTagExTick/AgentQueryTagExC,"
"AgentQueryEvtListTick/AgentQueryEvtListC,AgentGetTriggerTaskTick/AgentGetTriggerTaskC,ModifyByTagOutMem,ServiceOutofMem", "NOSServerStat")) 
{
    TKWriteLog("<ERR>服务静态监控指标NOSServerStat初始化失败");
}

2.3 指标累积

在监控指标需要累积的地方,使用对应的函数:1

1
2
3
4
5
6
 // 下面的设置会得到结果ts:1449477351,login:1,reg:3,tick_avg:2ms,undefined:0
 this->hisstat_.Increase("login");
 this->hisstat_.Increase("reg");
 this->hisstat_.Increase("tick",6);
 this->hisstat_.Set("count",3);
 this->hisstat_.Max("reg",3);

2.4 OnTimeout()

在CTKHisService::OnTimeout()中需要调用对应的处理函数,对于不同的使用场景,此处添加的函数不同:

(1)“指标累积并输出到服务日志”
1
2
3
string strforPrint;
this->m_serverStat_.GetAndResetAllStat(strforPrint);
TKWriteLog("<STAT> %s", strforPrint.c_str());
(2)“指标累积并上报”
1
this->m_serverStat_.ReportStat();
(3)“指标累积、输出并上报”
1
this->m_serverStat_.ReportStat(true);

3. 监控指标数据流 1

  1. [被监控TK服务] –调用–> [StatAPI] –TK协议–> [TKCMAAgentService] –TK协议–> [TKLogCollectorService] –保存–> [文件]
  2. [文件] –读取–> [上传程序] –HTTP协议–> [openfalcon(小米监控)] + [KariosDB] –访问–> [MIS展示]
  3. [文件] –分析–> [分析程序] –推送–> [报警]

# 账单

账单针对数据明细,当积分配置属性且修改时,发送 socket 消息到账单服务,最后流转数据可以在【mis】-【配置平台】-【EVT自助运维平台】-【生态管理】-【账单查询】中可以查到。

基本思路:

代码基本使用流程

1)对象初始化:

  • 初始化 CTR 连接池m_connpoolCTR,该对象会在CTKNOSService::OnAsyncSendBill(...)中被使用,用于大批量异步发送账单信息。
1
2
3
4
5
6
//初始化连接池
if (!this->m_connpoolCTR.OnInitialUpdate("TKNOSService", "TKCTRCollectorService_SPIF", 25, TK_CONNECT_IDLEMODE_ONLYKEEP, ".\\TKService.ini")) 
{
  TKWriteLog("%s:%d 初始化到TKCTRCollector的连接池失败,请检测全局INI中[TKNOSService]TKCTRCollectorService_SPIF配置", __FILE__, __LINE__);
  return FALSE;
}
  • 初始化 直连 MySQL 连接池m_mysqlPool,该对象会在CTKNOSService::OnAsyncSendBill(...)中被使用,用于向该连接池转发写账单的消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
// 更新useMysql项
m_useMysql = TKGetIniInt("Config", "UseMysql", 0);
if (UseMysql()) {
    // 初始化直连mysql连接池
    m_mysqlPool = mysqlUtil::CMySqlConnPool::GetInstance();
    //...
    // 从 INI 配置中获取 MySQLConfig 配置项中的字段
    //...
    // 初始化 MySQL 配置:
    m_mysqlPool->init(user, passwd, ip_port, db);
    //...
}
    

2)初始化 CTKNOSService 的异步对象并绑定函数

  • 初始化一个 CTKAsyncSendObject<CTKNOSService> 的对象m_clsAsyncSendBill,并绑定函数CTKNOSService::OnAsyncSendBill(...)用于异步发消息:
1
2
3
4
5
6
7
8
// 取参数-异步连接池个数:
int ansySendCnt = TKGetIniInt("TKNOSService", "TKCTRCollectorService_STIF", 20, ".\\TKService.ini");
// 初始化异步对象-用于发送账单的对象:
if (!this->m_clsAsyncSendBill.Init(this, &CTKNOSService::OnAsyncSendBill, ansySendCnt)) 
{
    TKWriteLog("<ERR> 初始化账单异步消息发送对象失败!");
    return FALSE;
}

3)异步发送账单函数

  • 补充好异步发送账单的函数 CTKNOSService::OnAsyncSendBill(...)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
BOOL CTKNOSService::OnAsyncSendBill(PTKHEADER pMsg, DWORD dwParam, BOOL bWaitAck)
{
    if (UseMysql()) {
        // 直连mysql的方式记录账单
        mysqlpp::Connection * conn = nullptr;
        // 从 m_mysqlPool 中获取一个MySQL的连接:
        conn = m_mysqlPool->GetConnection();
        //...
        // 构造插入语句
        std::ostringstream oss;
        //...
        oss << "INSERT INTO " << m_mysqlPool->GetDB() << ".nos_bill_data_" << str_now << " (dataid,busdata_id,pid,mpid,ts,value,nv,ov,modifytype,note) VALUES (" << value["dataid"] << "," << value["BillType"] << "," << value["PID"] << "," << value["mpid"] << "," << bill_ts << "," << value["Value"]  << "," << value["nv"] << "," << value["ov"] << "," << value["ModifyType"] << ",'" << string(value["note"].asCString()) << "');";
		//...
        // 执行查询
        mysqlpp::Query query = conn->query(oss.str().c_str());
        //...
        // 监控返回时间,记录CMA
        //...
        // 释放 conn 连接
        //...
    } else {
        // 从 CTR 连接池获取连接:
        CTKConnection * pconn(NULL);
        pconn = m_connpoolCTR.GetConnection(__FILE__, __LINE__);
        //...
        // 向 CTR 转发收到的消息:
        if (!pconn->SendMsg(pMsg, TRUE, pMsg->dwType | TK_ACK)) {...}
        // 获取返回的消息:
        PTKHEADER pRecvMsg = pconn->PeerRecvMsg();
        //...
        // 释放 pconn 连接
        //...
    } 
    
    if (!IsOK(ec)) {
        //消息异步恢复
        m_sdNOSSendRecover.RecordMsg(NOS_MSGRECOVER_BILLDATA, (char *)pMsg, pMsg->dwLength + sizeof(TKHEADER));
    }
    return TRUE;
}

疑问:CTR是干啥的?

单从使用内容来看,CTR应该包含如下内容和功能:

  • 解析传入的命令;
  • 攒出来一个 MySQL 语句;
  • 连接账单的数据库,并将该 SQL 语句执行,写入对应的账单内容。

4)外部直接调用函数

  • 包装好发送账单的函数CTKNOSService::RecordBill(...)
1
2
3
4
5
6
7
8
9
void CTKNOSService::RecordBill(...)
{
    // 补齐账单所需的参数,例如 PID、BillType、ModifyType 等信息
    //...
    // 拼装发送消息所需的消息头 buf
    //...
    // 异步发送消息:
	m_clsAsyncSendBill.AddMsg((PTKHEADER)buf.GetBufPtr(), TK_MSG_OTHER2CTR_COLLECTOR_PUSH_JSON_EX);
}

5)使用

  • 使用的时候直接调用CTKNOSService::RecordBill()
1
2
3
4
5
6
7
8
9
10
11
// 整理需要记录账单的信息:
Json::Value optionalJ;
optionalJ["dataid"] = static_cast<unsigned int>(m_id);
optionalJ["nv"] = static_cast<__int64>(balance);
optionalJ["mpid"] = static_cast<unsigned int>(dwMPID);
optionalJ["ts"] = static_cast<unsigned int>(dwTS);
optionalJ["ov"] = static_cast<__int64>(oldV);
optionalJ["note"] = static_cast<string>(note);
optionalJ["expireat"] = GetExpireatForBill();
// 通过 CTKNOSService 的单例调用函数:
g_tkNOSService->RecordBill(dwPID, m_billType, m_editMethod, time(NULL), val, optionalJ);

# CLS 条件日志

服务中通过代码提前埋好 stake(桩)。待需要 debug 的时候,在 CLS 相关的 mysql 中添加对该stake的监控,即可在对应的 mysql 中按照条件打好日志。

具体介绍和使用文档见参考2

基本思路:通过bgs::CServiceCLS::GetInstance()获取单例,使用 CLS 条件日志。

使用思路

  • CTKNOSService::OnInitialUpdate()中初始化CServiceCLS的单例:
1
2
3
4
5
6
7
8
9
10
// 初始化`CServiceCLS`的单例:
if (!bgs::CServiceCLS::GetInstance()->Init(...)) 
{
  KWriteLog("<ERR> 条件日志订阅初始化失败");
} else {
  TKWriteLog("条件日志订阅初始化成功");
}

// stake 桩位初始化:
InitCLSStake();
  • stake 桩位初始化函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
// stake 桩位初始化:
void CTKNOSService::InitCLSStake();
{
    bgs::CServiceCLS::GetInstance()->TKStakeInit("RecordBill", "记录账单", 8,
        "UserID", "用户ID",
        "EID", "积分ID",
        "MPID", "比赛ID",
        "BillType", "账单类型",
        "TS", "时间",
        "Value", "修改值",
        "Balance", "修改后积分值",
        "OldValue", "修改前积分值");
}
  • 在需要使用的地方进行调用:
1
2
3
4
5
6
7
8
9
bgs::CServiceCLS::GetInstance()->TKWriteLogRemote(dwPID, 0, 0, 0, "RecordBill", 8,
  "UserID", bgs::VARTYPE_LONG, dwPID,
  "EID", bgs::VARTYPE_LONG, m_id,
  "MPID", bgs::VARTYPE_LONG, dwMPID,
  "BillType", bgs::VARTYPE_INT, m_billType,
  "TS", bgs::VARTYPE_LONG, dwTS,
  "Value", bgs::VARTYPE_LONG, val,
  "Balance", bgs::VARTYPE_LONG, balance,
  "OldValue", bgs::VARTYPE_LONG, oldV);

ServiceCLS 类内处理思路

通过 CServiceCLS::TKWriteLogRemote() 传入消息后:

  • 将该消息整理加工,添加 userID,IP 等信息;
  • 将该消息push到一个 std::queue中;
  • 在异步函数中加锁来控制发送消息的时机,从该队列中取消息;
  • CServiceCLS::SendCLSLog()中调用 Service 连接池的CTKConnectionPool::SendMsg()来发送消息。

系统时间偏移量

TKNOSService 中还维护了一个 CTKConnectionPool 对象 m_clsGetTimeCoonPool(连接 CLS 的连接池)。

TKNOSService 系统会在CTKNOSService::OnInitialUpdate()中调用CTKNOSService::UpdataSystermTimeStampOffset(),再在其中调用 CTKNOSService::GetTimeFromCLS(...),最后通过 m_clsGetTimeCoonPool.SendMsg(...)使用协议头 TK_MSG_OTHER2CLS_GETTIME 发送消息获取时间并返回。

最后与当前时间做差,赋值给m_systermTimeStampOffset:系统时间偏移量,作为系统的一项配置被不断更新。采用CLS中心化节点方式实现系统时间校准。

# DHL

为了实现数据驱动,要大范围的收集数据。在收集数据的时候,有侵入式的采集方法,比如 DTCSDK,这种方法由于耦合度高,并且稳定性受到接入方的怀疑,所以推广不开;还有 日志采集 的方式,这种方式因为其非侵入性,业务方更容易接受。老版本的 DHL 虽然可以采集数据,但是其可靠性一直受到质疑,数据丢失的错误也难以排查定位,并且随着对 数据实时性 的需求增加,老版本的 DHL 也亟需升级。

在此基础上,新的 DHL 应运而生。3

DHL 服务订阅了两个 CVS 的配置表:define_dhl 和 define_dtccollect_node。DHL 从 define_dhl 中拉取配置规则;从 define_dtccollect_node 中拉取 dtcCollector 节点的信息。

  • define_dhl :

    哪些机器拉取这条配置;采集到的数据发往哪一个集群(集群配置在define_dtccollect_node表中);过期时间。

# DTC

目的

dtcCollector主要是将不同数据分发到不同的kafka topic。为了提升整个数据链路上的数据质量,dtcCollector新增了诸多可靠性保障机制 。4

dtcCollector 服务在整个数据流转中的位置

在数据流转过程中,原始数据会被存放到kafka。Kafka由producer、broker、consumer三部分组成。其中,DTCCollector是producer,我们申请的kafka集群是broker,DTCIOsream等其他消费端是consumer。4

DTCCollector负责将数据发送到kafka对应的topic中。数据是由采集系统传到DTCCollector的,采集系统包括DHL、DTCSDK以及通过http采集过来的数据(之后会改为http将数据落文件,然后由DHL采集)。

DTC 与 DHL 的关系

采集者在生产者机器上部署日志采集服务(DHL),将日志采集、上传到数据流转中心(DTC);DTC将数据存储到存储介质(kafka)。5

# Redis

Hash 数据类型的常用命令

image-20240626112438524

CRedisAutoPool 基本使用

基本思路

0)基本信息

程序中是通过CRedisAutoPool 的单例来使用Redis连接池的。

使用时通过CRedisAutoPool::GetInstance()获取单例,进行初始化、定时更新、读写Redis数据。

CRedisAutoPool中维护着一个vector,用于记录所有的Redis集群信息:

1
std::vector<CDBConnManager *>   m_RedisPoolList;

其中,每个CDBConnManager对应一个集群,该类的具体介绍见本文下一章。

m_RedisPoolList的信息读取自配置文件ClusterDefine.json

1)初始化 OnInit()

CTKNOSService::OnInitialUpdate() 中对 Redis 连接池进行初始化:

1
2
3
4
5
if (!CRedisAutoPool::GetInstance()->OnInit()) {    
    return FALSE; 
} else {    
    TKWriteLog("连接池初始化完毕!"); 
} 

CRedisAutoPool::OnInit() 中代码的基本逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool CRedisAutoPool::OnInit()
{
    //...
    // 初始化两个CMA监控服务
    //...
    // 获取 NOS\ClusterDefine.json 的路径,并读取该json文件
    //...
    // 对于多个Cluster,
    for (Json::ArrayIndex i(0); i < root.size(); ++i) 
    {
        //...
        // 创建一个 CDBConnManager 的对象用于维护该 cluster 的连接
        CDBConnManager * dbItem = new CDBConnManager;
        ret = dbItem->InitObj(root[i], m_RedisPoolListLen, m_strConsulServer);
        if (!ret) {
            delete dbItem;
            break;
        }
        m_RedisPoolList.at(m_RedisPoolListLen) = dbItem;
        //...
    }
    return ret;
}
    

函数 CDBConnManager::InitObj(...) 的使用方法见 初始化 InitObj(…)

2)更新 OnUpdate()

CTKNOSService::OnTimeout() 中对 Redis 连接池信息进行更新:

1
CRedisAutoPool::GetInstance()->OnUpdate();

CRedisAutoPool::OnUpdate() 中代码的基本逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool CRedisAutoPool::OnUpdate()
{
    // CMA 在本地日志打印状态并向TKCMAAgentService发送状态
    //..
    // 读取最新的 ClusterDefine.json 文件
    // 遍历json中所有clu+ster:
    for (Json::ArrayIndex i(0); i < root.size(); ++i) 
    {
        // 读取当前cluster的各字段
        //...
        // 判断当前cluster是否为Redis池当前已经记录在档的某一个
        if (isOldCluster) {
            // 通过调用 CDBConnManager::updataInfo() 更新此cluster的各字段
        } else {
            // new 一个新的 CDBConnManager 塞到 m_RedisPoolList 中
        }
    }
    // 如果上面没出问题的话,调用函数 Dump()
    // Dump() 就是再把上面更新后的所有cluster信息写回到json中
    if (ret) {
    	Dump();
	}
    return ret;
}

这里产生了一些疑问:

  • 若 ClusterDefine.json 中删掉某一个 cluster 信息的话,m_RedisPoolList 中就会存在一个失效的 cluster 信息。这里不需要对失效的 cluster 信息进行处理(删除)吗?
  • Dump()有什么意义?上面刚读了 json 文件,转头就再写回去?无非就是多了一些老的 cluster 信息。

3)数据转存至 SSDB

  • 定义了一个CTKAsyncSendObject<CTKNOSService>类型的异步对象m_clsAsyncSend2SSDB,用于将数据降级至 SSDB 队列。

  • CTKNOSService::OnInitialUpdate()中初始化,

    将函数CTKNOSService::OnAsyncDataDemition()托管给该对象:

1
2
3
4
5
6
7
8
9
10
11
BOOL CTKNOSService::OnInitialUpdate()
{
    //...
    if (!this->m_clsAsyncSend2SSDB.Init(this, &CTKNOSService::OnAsyncDataDemition, 20)) {
        TKWriteLog("< ERR > 数据降级至SSDB数据库异步发送对象初始化失败!");
        if (m_DataDemitionSwitch > 0) {
            return FALSE;
        }
    }
    //...
}
  • 在函数 CTKNOSService::OnAsyncDataDemition()中,将数据转存到 SSDB:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
BOOL CTKNOSService::OnAsyncDataDemition(PTKHEADER pMsg, DWORD dwParam, BOOL bWaitAck)
{
    //...
    // 按照既定格式解析pMsg,从中获取PID、EID、Key、fields等必要信息
    //...
    // 从集群配置信息中读取 "ssdb_evt_Delay" 集群的信息
    //...
    
    // 获取当前积分所在集群的 baseclusterID :
    unsigned baseclusterID = CRedisAutoPool::GetInstance()->GetClusterIDWithPid(pBase->GetClusterName(), dwPid);
    
    // 获取当前积分的 val :
    vector<DWORD> val;
    CRedisAutoPool::GetMultiHashFieldValue(baseclusterID, key, vecFields, val, 0);
    
    // 将积分写入 ssdb 中,并原来的 redis 集群中删除:
    if (IsOK(CRedisAutoPool::SetMultiHashFieldValues(ssdbCluterID, key, vecFields, val))) {
        CRedisAutoPool::HDelHashField(clusterID, key, vecFields);
    }
    
    //...
}

4)Redis 数据的查询、修改、删除

  • CRedisAutoPool 中提供了很多静态函数用于 Redis 数据的查询、修改、删除等操作。

  • 基本思路:获取当前积分的数据结构,攒出来一个 Redis 命令字符串,执行该 Redis 命令。

  • 查询函数。相关的几个函数的基本逻辑都是一样的,以GetMultiHashFieldValue() 为例介绍代码逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
unsigned int GetMultiHashFieldValue(const unsigned int id, const string & key, const vector<string> & fields, vector<unsigned long> & val, int nullVal)
{
    //...
    // 校验入参key、fields是否有值
    //..
    // 攒出来一个redis执行语句 oss :
    ostringstream oss;
    oss << "HMGET " << key;
    for (auto citer = fields.begin(); fields.end() != citer; ++citer) {
        oss << " " << *citer;
    }
    // 通过函数 CDBConnManager::ExecRedisCommand() 执行该redis语句:
    redisReply * reply = NULL;
    reply = static_cast<redisReply *>(
        pConnPool->m_RedisPoolList[id]->ExecRedisCommand(oss.str().c_str()));
    // 解析返回的 reply ,提取出 int 并记录在出参 val 中。
    //...
    // 记录CLS条件日志
	//...
    // 记录CMA信息
	//...
    return ec;
}
  • 修改、删除函数与上面的大体一致,后续详细看了以后再在这里补充吧。

CDBConnManager 基本使用

0)基本信息

上面的CRedisAutoPool就是在维护一个 std::vector<CDBConnManager *>的对象 m_RedisPoolList,以此来维护所有的 Redis 集群的连接。

每个 CDBConnManager 对象,就是对应一个 Redis 集群,每个集群均对应 ClusterDefine.json中的一个字段,形如:

1
2
3
4
5
6
7
8
9
   {
      "consulserverargv" : "passing=true",
      "consulserverpath" : "/v1/health/service/evt-004-redis",
      "ipport" : "10.22.120.134:9390,10.22.120.135:9390,10.22.120.136:9390,10.22.120.137:9390,10.22.120.140:9390,10.22.120.141:9390",
      "isNewCluster" : true,
      "name" : "redis_evt_A",
      "pwd" : "redis_EVT@JJMatch",
      "type" : "redis"
   }

CDBConnManager类内有一个成员变量对象,该对象记录着此集群下所有的ip:

1
std::vector<REDISCONNQUEUEITEM>             m_vcIPConn;// ip队列

1)初始化 InitObj(…)

入参const Json::Value & root 是从 ClusterDefine.json 文件中读取的,root 的内容示例:

1
2
3
4
5
6
7
8
9
{
    "consulserverargv" : "passing=true",
    "consulserverpath" : "/v1/health/service/evtssdb-001-ssdb",
    "ipport" : "10.30.142.10:8387,10.30.142.11:8387,10.30.142.12:8387",
    "isNewCluster" : false,
    "name" : "ssdb_evt_B",
    "pwd" : "rediscluster_statevt_8888@JJMatch",
    "type" : "redis"
}

CDBConnManager::InitObj(...) 代码思路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 关键: 避免线程冲突、避免死锁
bool CDBConnManager::InitObj(const Json::Value & root, const long & id, const string & consulserver)
{
 	// 从 TKService.ini 中获取一些参数
    //...
    // 从 root 中获取 cluster 的信息,例如 name、pwd、ipport 等
    //...
    // 通过参数初始化连接
    if (!initRedisPool(/*...*/)) {
        TKWriteLog("<ERR> clusterdefine 初始化失败");
        return false;
    }
    //...
    return true;
}

CDBConnManager::initRedisPool(...) 代码思路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
bool CDBConnManager::initRedisPool(
    const string & name,
    const string & ipport,
    const string & pwd,
    const string & consulserver,
    const string & consulserverpath,
    const string & consulserverargv)
{
    //...
    // 尝试切换到 consulserver(容灾中心)提供的 ip 上:
    if (!consulserver.empty()) {
        //...
        for (...) {
            //...
            if (ret = GetSwitchIP()) {
                break;
            }
        }
    }
    // 如果上面 consulserver 没有正确建立连接,就尝试从入参 ipport 建立连接
    if (!ret) {
        //...
    }
	return ret;   
}

2)ip切换 GetSwitchIP( )

CDBConnManager::GetSwitchIP() 用于切换当前的 ip 到 consulserver(容灾中心)配置的 ip 上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
bool CDBConnManager::GetSwitchIP()
{
    // HTTP 连接 consulserver 获取consul配置信息:
    CHTTPConnector httpSender;
    int httpret = httpSender.HTTPGetMsg(...);
    //...
    // 统计当前所有已连接ip中,存活IP数
    //...
    // 从consul配置信息中获取可用的 ip + port :
    vector<string> allowedIps; // 可用ip
    vector<int> allowedPorts; // 可用port
    //...
    // 遍历所有consul的ip连接:
    // (非全量更新,创建或重连新连接)
    for (int i = 0; i < allowedIps.size(); i++) {
        // 检查此"可用ip"是否在consul的ip中
        //...
        // 若是新增的ip(相对目前已连接的ip):
        	// 新建一个 REDISCONNQUEUEITEM 连接塞到 m_vcIPConn 里;
        		REDISCONNQUEUEITEM * connItem = &m_vcIPConn.at(m_ipCount); 
        	// ...
        // 若是旧的ip:
        	// 若该旧的连接不"存活"(0 == m_vcIPConn[index].alive):
        		// 尝试重新连接:
        			TryReConnect(m_vcIPConn[index])
    }
    // 若是全量更新,就把新配置中没有的老ip灭掉、该ip下的redis连接释放:
    if (CRedisAutoPool::GetInstance()->IsFullUpdate()) {
        // 遍历目前所有已连接ip:
        for (int j = 0; j < m_ipCount; j++) {
            // 判断该已连接ip是否在consul的ip中,若不在的话:
                // 判断ip是否存活。若该ip存活的话,灭掉:
                if (m_vcIPConn[j].alive != 0) {
                    ::InterlockedExchange(&m_vcIPConn[j].alive, 0);
                    // 将该ip下的redis连接都删掉:
                    size_t sizeQueue = m_vcIPConn[j].connitem.size();
                    if (sizeQueue > 0) {
                        while (!m_vcIPConn[j].connitem.empty()) {
                            auto iter = m_vcIPConn[j].connitem.front();
                            m_vcIPConn[j].connitem.pop();
                            ReleaseConnection(iter, true);
                        }
                    }
                }
        }
    }
    // 统计现在存活的ip个数
    // 输出到log日志中
    return ret;
}

3)更新 updataInfo(…)

此函数会在 CRedisAutoPool 中的 OnUpdate() 中被调用,用于更新集群信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
bool CDBConnManager::updataInfo(
    const string & ipport,
    const string & consulserver,
    const string & consulserverpath,
    const string & consulserverargv,
    const string & pwd)
{
    // 从 INI 文件中获取 "错误链接阈值" 等配置信息
    //...
    if (!consulserver.empty()) {   
        // 根据入参 consulserver(容灾中心)拆分出容灾中心的ip、port等信息
        
        return TryGetSwitch();
    } else {    
        // 容灾中心没有消息,使用本地配置加载集群
        // 从入参ipport中拆分出配置中的ip和port信息,存为两个vector:ipvec和portvec
        // 循环所有的目标ip
        for (int iter = 0; ipvec.size() != iter; ++iter) {
            if (!IsOldIp(ipvec[iter])) {  // 当前连接池中没有该目标ip
                // 创建一个新的Redis连接,并填充所有信息:
                REDISCONNQUEUEITEM * connItem = &m_vcIPConn.at(m_ipCount);
                connItem->ver = 0;
                connItem->alive = 1;
                connItem->errNum = 0;
                //...
                // 循环创建5个相同的RedisConn填到上面的connItem中专门用来记录redis连接的队列中:
                for (int i = 0; i < REDIS_CONN_NUM_ON_BEGIN; i++) {
                    //...
                }
                // 检查创建的redis连接数量是否小于3(小于3就报错)
                // ip数量+1:
                ::InterlockedIncrement(&m_ipCount);
            } 
            else {  // 当前连接池中已存在该ip
                // 先找到当前redis连接在连接池中的索引,保存为index
                // 若该连接不是存活状态,重新连接:
                if (/*...*/&& 0 == m_vcIPConn[index].alive) {
                    TryReConnect(m_vcIPConn[index]);
                }
            }
        }
    }
    return ret;
}

4)尝试ip切换 TryGetSwitch( )

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bool CDBConnManager::TryGetSwitch()
{
    bool ret = true;
    time_t tsNow = time(NULL);
	// 判断当前时间是否已经大于上次更新时间
    if (tsNow - m_lastReConnTime > 1) {
        if (::TryEnterCriticalSection(&m_csBingSwitchSection)) {
            //TKWriteLogFile("DBConnLogFile.log", "当前线程抢占到更新锁!!!");
            ret = GetSwitchIP();
            ::LeaveCriticalSection(&m_csBingSwitchSection);
            //TKWriteLogFile("DBConnLogFile.log", "当前线程离开更新锁!!!");
        } else {
            //TKWriteLog("<Info> 当前占用BingSwitch锁的线程为: %d", m_csBingSwitchSection.OwningThread);
        }
    }
    if (!ret) {
        CRedisAutoPool::IncState("SwitchErr");
    }
    return ret;
}

# CVS 配置管理类

为保证服务

1. CTKTwoBuf

双缓冲对象。

该类维护了两套来自 CVS 的配置,用于读取新的 CVS 配置,以及瞬时切换当前正在使用的配置。

该类用于 TKNOSService 服务中。

1)类定义

该类的定义位于ShareCode 中的 TKTwoBuf.h 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// TYPE:CVS中的配置格式,NOS中为 CConfigMap
// TYPE_SVR:当前是为哪个服务配置的
template <class TYPE, class TYPE_SVR>
class CTKTwoBuf
{
    // 定义一个函数指针,该函数用于更新配置
	typedef BOOL(TYPE_SVR::*TKUPDATEDICFUNC)(const std::shared_ptr<TYPE> & _pOld, std::shared_ptr<TYPE> & _pNew);
public:
	CTKTwoBuf(void);
	virtual ~CTKTwoBuf(void);
    
    // 给 m_pOwner 和 m_UpdateDicFunc 赋值,并调用 UpdateDic(1):
	BOOL Init(TYPE_SVR * _pOwner, TKUPDATEDICFUNC _pUpdateDicFun);
    
	void GetData(std::shared_ptr<TYPE> & ptr);   //取实际对象
	BOOL UpdateDic(int _nVer);  // 更新字典表
                                // 这里会调用回调函数来更新配置,没问题的话修改 m_nCurOEElement
	int GetCurVer() { return m_nDicVer; };    //取当前版本号
private: 
	int m_nDicVer;                     // 版本号,Init()的时候赋值为1,后续为时间戳
	TYPE_SVR * m_pOwner;               // 当前是为哪个服务配置的
	int m_nCurOEElement;               // 只有0和1两个取值,对应m_Data中的两套
	TKUPDATEDICFUNC m_UpdateDicFunc;   //回调函数指针:用于更新配置
                                       // NOS中的函数为CTKNOSService::OnEvtConfigUpdate()
	time_t m_tUpdateTime;              //上一次更新时间
	std::shared_ptr<TYPE> m_Data[2];   // 两套 CVS 配置
};   

2)配置模板:CConfigMap

这个类用于存储 NOS 服务从 CVS 中拉取的配置信息。在 NOSService 中作为 CTKTwoBuf 中的 TYPE 传入。

2. CVSAgent

为 CTKTwoBuf 类的 pro 版本,同样也是一个双缓冲对象。

该类维护了两套来自 CVS 的配置,用于读取新的 CVS 配置,以及瞬时切换当前正在使用的配置。

该类用于 ESP、OSS 中。

1)CVSAgent 类定义

  • 该类的定义位于 CVSClient.h 中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 用户配置类代理,CVSSDK对应的实体。融合twobuf
// 一个代理对应一种配置对象,对应一个twobuf,对应多个更新方法
template <class TYPE>
class CVSAgent
{
public:
    typedef bool(*TKUPDATEDICFUNC)(const std::shared_ptr<TYPE> & _pOld, const std::string & configs, std::shared_ptr<TYPE> & _pNew, std::string & feedback, bool bMakeShare);
    
public:
    CVSAgent() : m_nDicVer(0), m_nCurOEElement(0) {...};
    ~CVSAgent() { };
    
	// 如果不可以在构造函数中向ZK注册回调,那么在此注册
    // @param:identifier 节点标识,请使用IP:PORT的字符串进行标识
    // @param:business 业务名,英文字母描述业务缩写如EVT,CLS
    bool Init(std::string identifier, std::string business) {...};
    // 添加初始化方式。依照添加的顺序筛选配置、调用加载
    bool AddUpdateFunc(std::string cfgtype, TKUPDATEDICFUNC callback) {...}
	// 2018-12-4 lixm避免通过返回值返回临时变量,实测会被VS优化,并不增加引用计数
    bool Update();  
    // 取正在生效的配置对象
    void Get(std::shared_ptr<TYPE> & ptr) { ptr = m_Data[m_nCurOEElement ? 1 : 0]; };  

// 辅助APIs
public:
    int GetLastUpdateTime() { return m_nDicVer; };
private:
	// 从ZK获取对端CVS的地址信息。如果未成功连接ZK,从本地配置获取
    bool InitCVSConnPool() {...}
	// 根据节点信息从远程CVS获取指定类型的全量配置
    bool GetTypeOnNodeIdent(const std::string & cfgtype, std::string & allCfgs);  
    // 从本地备份文件获取指定类型的全量配置
    bool GetTypeFromLocalBackup(const std::string & cfgtype, std::string & allCfgs) {...}
    // 从远程CVS获取并加载成功后,写入本地备份文件
    bool WriteTypetoLocalBackup(const std::string & cfgtype, const std::string & allCfgs);
	// 反馈对获取的配置项的加载结果
    bool FeedbackLoadStatus(const std::string & cfgtype, const std::string & feedback); 

// 成员
private:
    std::string m_strWorkPath;        // 当前服务的运行路径
    std::vector<TypeUpdater<TYPE> > descriptors;  // 对多种类型的对应更新方式
    static std::string entident;      // 该实体的标识,用于决定该获取哪种配置
    static std::string m_port;        // 宿主服务端口号
    int m_nDicVer;                    // 上次切换buf的时间
    int m_nCurOEElement;              // 当前使用哪一个buf
    std::shared_ptr<TYPE> m_Data[2];  // 实际配置对象。不能显式初始化,因此该模板只接受有无参构造函数的对象

// 资源
    CTKConnectionPool m_connpoolCVS;   // 到CVS服务的连接池
    static CVSAgent<TYPE> * m_pthis;   // 强制单例模式。即对一种配置对象,只有一个CVSClient
                                       // 这个指针会在初始化的时候赋值
    std::string m_cvsloadsrvfail;
    std::string m_cvsloadlocalfail;
    std::string m_cvssrvfail;
    std::string m_cvslocalfail;
    bgs::CServiceStat m_stat;        // CMA监控项,上面4个string是初始化的监控字段
};
  • 初始化函数

    使用的时候,构造对象之后,需要立刻调用 Init() 来初始化对象。

1
Init()
  • 更新配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
template <typename TYPE>
bool CVSAgent<TYPE>::Update()
{
    //...
	// 从ZK获取对端CVS的地址信息。如果未成功连接ZK,从本地配置获取
    InitCVSConnPool();    // 这里会初始化成员变量 m_connpoolCVS
    //...
    // 遍历所有类型的更新方法:
    typename std::vector<TypeUpdater<TYPE> >::iterator iter = descriptors.begin();
	for (; iter != descriptors.end() && !ifanyerr; ++iter) {
        // 先尝试远程加载:向CVS发消息,
        // 根据 descriptors 中保存的string字段从CVS 中获取对应的所有配置,输出为 filtrated:
        GetTypeOnNodeIdent(iter->m_cfgtype, filtrated);
        // 调用descriptors中记录的函数,解析 filtrated 为json,记录到m_Data[]的另一个里
    	//...
        // 落盘到文件
        WriteTypetoLocalBackup(iter->m_cfgtype, filtrated);
        
        // 如果上述远程加载没有成功的话,从备份文件加载配置,输出为 filtrated:
        GetTypeFromLocalBackup(iter->m_cfgtype, filtrated);
        // 调用descriptors中记录的函数,解析 filtrated 为json,记录到m_Data[]的另一个里
        //...
        // 反馈对获取的配置项的加载结果:
        FeedbackLoadStatus(iter->m_cfgtype, feedback);
    }
    //...
}

2)TypeUpdater 类定义

该类负责记录一对儿数据,包含一个 string 和一个 TKUPDATEDICFUNC 类型,以方便 CVSAgent 中通过 std::vector 记录所有的键值对。

这个类型做的事情完全可以通过一个 map 来解决,猜测可能是由于当时的 STL 中还没提供成熟的 std::map 而采用的权衡之法。

类定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
template < class TYPE>
class TypeUpdater
{
public:
    typedef bool(*TKUPDATEDICFUNC)(const std::shared_ptr<TYPE> & _pOld, 
                                   const std::string & configs, 
                                   std::shared_ptr<TYPE> & _pNew, 
                                   std::string & feedback, 
                                   bool bMakeShare);

    TypeUpdater(const std::string & cfgtype, 
                TKUPDATEDICFUNC updater): m_cfgtype(cfgtype), m_updater(updater) {};
    bool ifAdded(const std::string & cfgtype, 
                 TKUPDATEDICFUNC updater){
        return m_cfgtype == cfgtype && m_updater == updater;
    };

    std::string m_cfgtype;
    TKUPDATEDICFUNC m_updater;
};

3)TKESPService 中的使用

  • 类声明中定义一个 CVSAgent<CTKESPConfigMap> 的对象:
1
2
3
4
5
6
7
class CTKESPService : public CTKWinObjNetService
{
    //...
private:
    CVSAgent<CTKESPConfigMap>  m_config;
    //...
};
  • CTKESPService::OnInitialUpdate() 中初始化该对象:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
BOOL CTKESPService::OnInitialUpdate()
{
    //...
    // 取得内网ip的字符串
    string strIPPort = cipinter;
    if (!m_config.Init(strIPPort, "ESP")) {
        TKWriteLog("<ERR> 初始化CVS错误!");
        return FALSE;
    }
    if (!m_config.AddUpdateFunc("define_esprule", &InitCLSConfig)) {
        TKWriteLog("<ERR> CVS AddUpdateFunc 错误!");
        return FALSE;
    }
    if (!m_config.Update()) {
        TKWriteLog("<ERR> CVS 初始化,首次Updata失败!");
        return FALSE;
    }
    //...
}
  • 定义 InitCLSConfig(...) 函数,需要注意的是,该函数形式需要与 CVSAgent 中定义的函数指针 TKUPDATEDICFUNC 的形式保持一致。
1
2
3
4
5
typedef bool(*TKUPDATEDICFUNC)(const std::shared_ptr<TYPE> & _pOld, 
                               const std::string & configs, 
                               std::shared_ptr<TYPE> & _pNew, 
                               std::string & feedback, 
                               bool bMakeShare);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bool InitCLSConfig(const std::shared_ptr<CTKESPConfigMap> & _CurConfig, 
                   const string & newconfig, 
                   std::shared_ptr<CTKESPConfigMap> & _NewConfig, 
                   string & feedback, 
                   bool bMakeShare)
{
    //...
    // 从入参newconfig中解析配置项的json字段为 definesJ
    //...
	// 是否转为shared类型的智能指针:
    if (bMakeShare) {
        _NewConfig = std::make_shared<CTKESPConfigMap>();
    }
	// 加载新配置:
    _NewConfig->LoadDefines(definesJ, feedback);
    //...
    return TRUE;
}
  • 定时更新配置:
1
2
3
4
5
6
7
8
9
10
11
BOOL CTKESPService::OnTimeout(struct tm * pTime)
{
    //...
	int nDicUpdateTime = TKGetIniInt("Config", "DicUpdateTime", 1);
    if (nDicUpdateTime == 1) {
        m_config.Update();
    } else {
        TKWriteLog("<Warning> 配置更新开关DicUpdateTime已关闭!");
    }
    //...
}
  • 配置的使用:

    在需要读取配置的地方,调用 Get() 函数,该函数会返回一个std::shared_ptr<CTKESPConfigMap> 类型的对象,然后进行使用即可。

    例如在CTKESPService::ProcessSourceData(...) 中需要使用配置时:

    1
    2
    3
    4
    5
    6
    7
    8
    
    DWORD CTKESPService::ProcessSourceData(...)
    {
        //...
        std::shared_ptr<CTKESPConfigMap> ptr;
        m_config.Get(ptr);
        SID2PROCESSESPRO::iterator sid2processpro = ptr->m_sid2ProcessPro.find(dwSID);
        //...
    }
    

4)配置模板:CTKESPConfigMap

CTKESPConfigMap 会作为CVSAgent<TYPE> 中的类型传入。

这个类用于存储 ESP 服务从 CVS 中拉取的配置信息。

  • 类声明位于 TKESPConfigMap.h 文件中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
typedef map<unsigned int, map<RULELEVELDIC, vector<shared_ptr<CTKESPProcess>>>> SID2PROCESSESPRO;
typedef map<string, set<unsigned int>> USERLISTNAME2LIST;

class CTKESPConfigMap
{
public:
    CTKESPConfigMap();
    ~CTKESPConfigMap();

    bool Clear();
    bool LoadDefines(const Json::Value & definesJ, string & feedback);
    bool IsInList(const string & strListName, const DWORD & dwPID);

    // map:左值为白名单配置名,右值为userid的列表
    USERLISTNAME2LIST m_mpUserNameList;   
    
    // map:左值为sid,右值为map(左值为ESPrule等级,右值为一个vector,为该等级下所有的ESP规则)
    SID2PROCESSESPRO m_sid2ProcessPro;
};
  • 解析 json 配置:LoadDefines(...)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
bool CTKESPConfigMap::LoadDefines(const Json::Value & definesJ, string & feedback)
{
    //...
    // 遍历入参的所有json配置:
    for (Json::ArrayIndex i(0); i < definesJ.size(); ++i) {
        // 创建一个 CTKESPProcess 的对象:
        shared_ptr<CTKESPProcess> pProcess = make_shared<CTKESPProcess>();
        if (pProcess == NULL) {
            //...
            return false;
        }
        // 获取当前json配置中各字段的内容
        JsonGetUInt(definesJ[i], endtime, "active_endtime");
        //...
        JsonGetUInt(definesJ[i], espid, "id");
        //...
        JsonGetUInt(definesJ[i], sid, "sid");
        //...
        JsonGetString(definesJ[i], content, "content");
        //...
        
        Json::Value contentJ(Json::arrayValue);  // 接收获得的配置项contentJ
        jreader.parse(content, contentJ, false);
        // 从 contentJ 中提取用户白名单:
        //...
        m_mpUserNameList[strUserListname].insert(pidtemp);
        //...
        // 根据 contentJ 记录的信息对 pProcess 初始化:
        ESPRESULT initret = pProcess->init(espid, sid, contentJ);
        // 若成功的话,将 pProcess 记录为成员变量
        if (initret == SD_EVT_PROCRESULT_OK) {
            resultJ["configtype"] = definesJ[i]["configtype"];
            resultJ["id"] = definesJ[i]["id"];
            resultJ["last_modify_time"] = definesJ[i]["last_modify_time"];
            resultJ["result"] = true;
            resultJ["feedback"] = "ok";
            m_sid2ProcessPro[sid][pProcess->rulelevel()].push_back(pProcess);
        }
        
    }
    //...
    return ret;
}

5)配置模板:CCLSConfigMap

CCLSConfigMap 会作为CVSAgent<TYPE> 中的类型传入。

这个类用于存储 CLS 服务从 CVS 中拉取的配置信息。

  • CCLSConfigMap 的类声明与 CLS 相关的代码放在一起,都位于 ServiceCLS.h 文件中。

5)配置模板:CTKDTCCollectConfig

CTKDTCCollectConfig 会作为CVSAgent<TYPE> 中的类型传入。

这个类用于存储 DTC 服务从 CVS 中拉取的配置信息。

  • CTKDTCCollectConfig 的类声明与 DTC 相关的代码放在一起,都位于 TKDtcCollectClient.h 文件中。

# 关于 ESP

0. 基本流程

  • OnInitialUpdate() 的时候,依次调用 m_config.Init(...)m_config.AddUpdateFunc(...)m_config.Update()加载 CVS 配置,具体加载内容和顺序见上面 CVSAgent 中的介绍。

1. CTKESPProcess

这个类用来维护一条 ESP 规则,也就是在 define_esprule_ 表中,通过 “content” 字段中记录的 json 解析出来的一条规则。

里面记录了当前规则中所有的控件。

  • 类定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class CTKESPProcess
{
public:
    CTKESPProcess();
    ~CTKESPProcess();

    // 初始化,这里会传入必须的参数,然后把所有的控件按照json记录的内容new出来。
    ESPRESULT init(unsigned int espid, 
                   unsigned int sid, 
                   const Json::Value & config);
    // 执行:根据当前类的对象记录的流程,再补充一些参数,然后开始执行
    ESPRESULT handle(long long llKeyWord[], 
                     Json::Value & sourceDataJ, 
                     unsigned int & needClean, 
                     unsigned int & subErrCode) const;
    inline unsigned int espid() const { return m_espid; };


    inline RULELEVELDIC rulelevel() const { return m_emRuleLevel; }

private:
    unsigned int m_espid;
    unsigned int m_sid;
    unsigned int m_version;
    RULELEVELDIC m_emRuleLevel;
    
    // 正常为0,最低位为1代表需要清空Json,第2低位为1代表需要清空KewWord
    unsigned int m_needClean;

    // 所有的控件:
    vector<std::shared_ptr<CTKESPModuleBase>> m_moduleList;
};

# 消息恢复机制

消息恢复机制涉及到两个类CTKMsgRecover<T>CTKAsyncSendObject<T>

  • CTKMsgRecover<T>:异步消息恢复
  • CTKAsyncSendObject<T>:异步消息恢复对象

当向下游发送消息失败时(可能是由于下游宕机或其它原因),调用类 CTKMsgRecover 创建的对象,记录该消息,CTKMsgRecover 类的对象会自动在每分钟调用的 OnTimesout() 函数中尝试将该消息发送出去。

1)使用方法

以 OSS 中的使用为例:

  • 定义:
1
2
3
4
// 异步消息恢复<recover>
CTKMsgRecover<CTKObjectStorageService> m_msgRecover;
//异步消息恢复对象
CTKAsyncSendObject<CTKObjectStorageService> m_clsAsyncSaveObj;
  • 初始化:
1
2
3
4
5
6
7
8
9
10
//5. 消息恢复初始化
{
  //定义可恢复异步消息<recover>
  m_msgRecover.AddRecMsgType(1, "OSSSrv", "object", &m_clsAsyncSaveObj);
  //定义消息恢复异步对象
  if (!m_clsAsyncSaveObj.Init(this, &CTKObjectStorageService::OnAsyncObjRecover, 2)){
      TKWriteLog("%s:%d,m_clsAsyncSaveHistory异步对象初始化失败!");
      return FALSE;
  }
}
  • 消息发送失败,记录需要恢复的消息:
1
2
3
4
5
6
7
8
9
// 先攒出来一个用于消息恢复的结构体:
RECOVEROBJECTREQ recStruct = {0};
memset(&recStruct, 0, sizeof(RECOVEROBJECTREQ));
recStruct.header.dwLength = size - sizeof(TKHEADER);
recStruct.oid = oid;
//...

// 记录该消息
this->m_msgRecover.RecordMsg(1, (char *)buff.GetBufPtr(), size);
  • 定时恢复:
1
2
3
4
5
6
// CMA 监控消息恢复情况:
this->opsstatDynamic_.Set("objrecover", this->m_msgRecover.GetRecoverMsgCnt(1));
this->opsstatDynamic_.Set("objrecord", this->m_msgRecover.GetRecordMsgCnt(1));
this->opsstatDynamic_.Set("objrecfail", this->m_msgRecover.GetRecoverFailCnt(1));
// 定时消息恢复:
this->m_msgRecover.OnTimesout(pTime);
  • <占位符>

2)CTKMsgRecover<T>

3)CTKAsyncSendObject<T>

# CVS

CVS 获取配置

以下梳理当 CVS 服务收到 TK_MSG_OTHER2CVS_GET_CERTAIN_TYPE 协议,获取配置的逻辑:

  • CVS 从协议头中提取所需信息:

    • 发送消息的节点 ip 信息(nodeident)。例1: 192.168.17.147:37010
    • cfgtype :例如 his_oss
  • 注册节点并获取控制信息:(函数 RegistorUpdateNode(...)

    • 根据给定节点套接字,获取区间套接字字段(sectionident)。例1: 192.168.17.*:37010

    • 利用下面的 SQL 语句从表 map_node2cluster 中获取所需的配置信息;

      1
      
      select node_addr,cluster,riskcontrol from CVS.map_node2cluster where node_addr = '[nodeident]' OR node_addr = '[sectionident]';
      
    • 将上述语句查找到的配置信息写为一个 Json::Value 并返回;

    • 更新查找到的数据中的 LastReportDate,写入当前时间。

  • 从上面返回的 Json::Value 中提取字段 'cluster' 的内容。例1:hisoss_neiwang

  • 获取所有指定的配置项:(函数 GetAllMatchingConfigs(...)

    • 利用以下 SQL 语句从表 map_define2cluster 中获取所需的配置信息(要查询的目标表);

      1
      
      select DISTINCT(configs_tablename) from CVS.map_define2cluster where applyto_cluster = '[cluster]' or applyto_cluster = '' and live_tag = 1;
      
    • 将上面查询到的内容保存为变量 matchtabsJJson::Value 类型)。

      例1:'configtablename': 'define_hisoss', 'define_hisoss_cluster'

    • 根据 matchtabsJ中记录的目标表,利用以下 SQL 语句从目标表中查询内容:

      1
      2
      3
      
      select a.* from '[configtablename]' as a inner join 
      (select id, max(last_modify_time) as max_last_modify_time from '[configtablename]' group by id) as b 
      on a.id = b.id and a.last_modify_time = b.max_last_modify_time where a.status >= 0;
      
    • 将上一步提取到的内容保存到入参 configsinmatchJJson::Value 类型)中以返回。

    • 例1:{"configtype": ""}

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      
      {
          {
              "[从MySQL中获得的具体配置项]":"[从MySQL中获得的具体配置项]",
              "configtype": "define_hisoss",
              "last_modify_time": "2018-06-01 14:44:04"
          },
          {
              "[从MySQL中获得的具体配置项]":"[从MySQL中获得的具体配置项]",
              "configtype": "define_hisoss_cluster",
              "last_modify_time": "2024-10-16 17:47:19"
          }
      }
      
  • 【占位符】

参考23617854

本文由作者按照 CC BY 4.0 进行授权