목표
이번 글의 목표는 아래글에서 작성한 소스를 좀 더 효율적으로
코드 리팩토링(Code Refactoring) 하는 것이 목표이다.
https://goodbyeanma.tistory.com/175
앞의 글에서 C++ 를 통한 Elasticsearch Cluster의 CPU, JVM, DISK 사용률을 수집하고
특정 임계치가 넘게되면 Telegram Bot을 통해
관리자 그룹에게 메시지를 보내주는 프로그램을 개발하였다.
만약, 모니터링할 대상 클러스터의 개수가 적다면 상관없겠지만
모니터링할 대상이 많다면, 하나하나의 클러스터에 접근하여 지표값을 집계하는 것보다.
한 번에 다수의 클러스터에 접근하여 지표값을 집계하는 것이 효율적이다.
위의 그림과 같이 c++ 로 짜인 프로그램의 로직은
server_list 디렉토리 하위에 존재하는 ES cluster 정보를 읽은 다음
Elastic 객체에 맵핑한 다음 vector <Elastic>에 저장한다.
그런 다음 단일스레드로 구성된 프로그램이 vector 내부의 Elastic 객체를 하나씩 읽어서
해당 ES Cluster 내부에 있는 노드들의 CPU, JVM, DISK 정보를 수집하는 방식이다.
아래의 그림을 보면 이해가 빠를것이다.
물론 모니터링해야 할 Cluster의 개수가 적을 때는 상관없겠지만,
모니터링해야 할 개수가 많아지게 된다면,
위의 모니터링 과정을 단일스레드로 처리하는 것은 비효율이라고 생각한다.
해법 - Multi Thread Programing
결국 멀티스레드로 처리한다면, 싱글스레드로 처리할 때보다 훨씬 빠르게
각 ES 클러스터의 지표를 수집할 수 있다.
(물론 멀티스레드가 실제로 아래의 그림처럼 작동하는 것은 아니다.)
멀티스레드 프로그래밍에 대해서 궁금하다면 아래의 글을 보고 오는 것을 추천한다.
https://goodbyeanma.tistory.com/173
멀티스레딩 프로그램을 사용하면 싱글스레드 프로그램보다 훨씬 빠른 작업속도를 가질 수 있지만,
고려해줘야 할 부분이 생긴다.
1. Logging
첫 번째로 고려해줘야 하는 부분인 로깅 부분이다.
기존의 코드는 아래와 같이 멀티스레드 환경에서 안전한 (thread-safe) 동기적인 로거를 생성하여 사용했다.
string logFilePath = "./metric_log/" + getCurrentTime("%Y.%m.%d") + ".log";
shared_ptr<spdlog::logger> fileLogger = spdlog::basic_logger_mt("file_logger", logFilePath);
멀티스레드 환경에서 "동기적인 로거를 생성하면 안 된다!"라는 것은 아니지만,
성능적으로 문제가 될 수 있다.
동기적 로깅은 로그 메시지를 파일이나
다른 출력 대상에 기록하는 데 필요한 시간만큼 작업 스레드가 대기해야 한다.
따라서, 로깅에 걸리는 시간이 애플리케이션의 성능에 직접적인 영향을 줄 수 있다.
특히, 로깅이 빈번하게 발생하는 경우나
로그를 기록하는 위치(예: 네트워크)에 느린 지연시간이 있을 경우,
성능 저하가 뚜렷하게 나타날 수 있다.
또한, 여러 스레드에서 동시에 같은 로거를 사용하여 로깅을 진행할 때,
스레드 안전성이 보장되어야 한다.
물론 spdlog는 스레드 안전성을 제공하지만,
동시에 여러 스레드에서 로깅이 이루어질 경우
내부적인 잠금 메커니즘이 작동하게 되어 성능 저하의 원인이 될 수 있다.
그러므로 멀티스레드 프로그램을 사용할 때는
비동기적 로깅을 지원하는 로거를 사용하는 것을 추천한다.
// ===== multi-thread =====
// Initialize thread pool
spdlog::init_thread_pool(8192, 1);
logFilePath = "./metric_log/" + getCurrentTime("%Y.%m.%d") + ".log";
// Create an asynchronous logger.
auto async_file_logger = spdlog::basic_logger_mt<spdlog::async_factory>("async_logger", logFilePath);
위의 코드는 비동기적 로깅을 사용하기 위한 코드 템플릿이다.
코드를 분석해 보면, 의문이 가는 부분이 있을 것이다.
첫 번째 구문에 스레드 풀을 생성하고 갑자기 매개변수 두 개를 넣고 있다.
spdlog 라이브러리를 열어보면 아래와 같은 소스코드가 존재한다.
첫 번째 매개변수는 큐의 사이즈이고 두 번째 인자는 스레드의 개수이다.
첫 번째 매개변수의 큐의 사이즈란 큐의 크기를 나타낸다.
이는 로깅 메시지를 저장하는 큐의 최대 크기를 지정하는 것이다.
해당 큐는 로그 메시지가 작성될 때부터
실제로 로그가 출력될 때까지 메시지를 저장하게 된다.
두 번째 인자인 스레드의 개수 인자는 스레드풀의 스레드 수를 나타낸다.
이 설정을 통해, 메인 애플리케이션 스레드가 로깅 함수를 호출할 때
메시지는 큐에 빠르게 추가되고 즉시 반환된다.
별도의 스레드(이 경우에는 1개의 스레드)가 큐에서 메시지를 가져와 실제 로그 출력을 수행한다.
아래의 그림을 참고하면 동기적 로깅 방식과 비동기적 로깅 방식의 차이를 이해할 수 있을 것이다.
2. Monitoring Cluster Division
두 번째로 고려할 사항은 애플리케이션에 몇 개의 작업 스레드를 할당하고
각 스레드는 몇 개의 클러스터에 대한 지표를 탐색하냐는 것이다.
적당한 스레드의 개수는 소프트웨어 / 하드웨어의 환경에 매우 의존적이므로
"몇 개의 스레드로 구동하는 게 좋다!"라는 답은 없다.
일단 예시에서는 4개의 스레드로 ES cluster의 지표를 수집한다고 가정해 보자.
만약에 15개의 클러스터를 모니터링한다고 가정하면, 아무 생각 없이
나누기 연산으로 클러스터를 분배하면 아래와 같은 상황이 나올 수 있다.
위의 그림에서 보듯, Thread4가 다른 스레드에 비해
너무 많은 ES 클러스터들의 모니터링을 담담하고 있다.
즉 아래와 같이 분배작업을 수행해야 한다.
그럼 어느 한쪽의 스레드만 작업이 과부하되는 일을 막을 수 있다.
코드구현
바뀐 코드 부분은 기존 코드에서 main.cpp 부분에 변경이 있다.
기존에 사용했던 esObjList() 함수를 esObjQue() 함수로 바꾸면서,
각 클러스터의 정보를 리스트 형식으로 제공하는 것이 아닌
Queue로 제공하는 알고리즘으로 변경하였다.
또한, 동기적 로거를 비동기적 로거로 수정해 주었고
ES 클러스터 분배 알고리즘도 적용하였다.
#include "common.h"
#include "clusterState.h"
#include "elastic.h"
#include "teleBot.h"
/**
* @brief Function to read json file
*
* @param filepath
* @return json
*/
json readJsonFile(const string& filepath) {
//test
auto logger = spdlog::get("file_logger");
ifstream ifs(filepath);
json jsonData;
// If the file was not opened properly, print an error message and return an empty json object.
if (!ifs.is_open()) {
logger->error("Failed to open the JSON file: {}", filepath);
} else {
try {
jsonData = json::parse(ifs);
} catch (const exception& e) {
stringstream ss;
ss << "filepath : " << filepath << " ERROR : " << e.what();
logger->error(ss.str());
}
}
ifs.close();
return jsonData;
}
/**
* @brief Function that displays only two decimal places - performs rounding at the third decimal place
*
* @param value
* @return double
*/
double roundToTwoDecimalPlaces(double value) {
return round(value * 100) / 100.0;
}
/**
* @brief Get the Current UTC object
*
* @param format
* @return string
*/
string getCurrentUTC(const string &format) {
using namespace chrono;
// Get current time from system clock
auto now = system_clock::now();
// Convert time to time_t format
time_t now_time_t = system_clock::to_time_t(now);
// Convert to GMT/UTC time
tm* now_utc_tm = gmtime(&now_time_t);
// Convert time to string ex) "%Y-%m-%d %H:%M:%S"
stringstream ss;
ss << put_time(now_utc_tm, format.c_str());
return ss.str();
}
/**
* @brief Get the Current Time object - Local time
*
* @param format
* @return string
*/
string getCurrentTime(const string &format)
{
using namespace chrono;
// Get current time from system clock
auto now = system_clock::now();
// Convert time to time_t format
time_t now_time_t = system_clock::to_time_t(now);
tm* now_kor_tm = localtime(&now_time_t);
// Convert time to string ex) "%Y-%m-%d %H:%M:%S"
stringstream ss;
ss << put_time(now_kor_tm, format.c_str());
return ss.str();
}
/**
* @brief
*
* @param file
* @return vector<Elastic>
*/
queue<Elastic> esObjQue(const json &file) {
auto logger = spdlog::get("file_logger");
vector<string> infoList;
queue<Elastic> clusterQ;
try {
for (const auto& elem : file)
{
stringstream clusterPath;
clusterPath << "./elasticsearch_server/" << elem["cluster_path"].get<string>();
infoList.push_back(clusterPath.str());
}
for (const string &path : infoList)
{
json data = readJsonFile(path);
vector<string> ipAddr;
string esId, esPw;
bool sslYn = true;
for (const string &elem : data["cluster_ip_port"]) ipAddr.push_back(elem);
if (data["ssl"].is_null()) sslYn = false;
esId = data["id"].is_null() ? "" : data["id"].get<string>();
esPw = data["pw"].is_null() ? "" : data["pw"].get<string>();
Elastic esObj(ipAddr, esId, esPw, data["ver"].get<string>(), sslYn);
clusterQ.push(esObj);
}
} catch (const exception& e) {
logger->error(e.what());
}
return clusterQ;
}
/**
* @brief Function to monitor metrics of ES cluster and Alarm occurred
*
* @param esObjList
* @param teleBot
* @param logger
*/
void metricMonitoring(vector<Elastic> esObjList, TeleBot teleBot, shared_ptr<spdlog::logger> logger)
{
for (Elastic esObj : esObjList)
{
stringstream msgStream;
stringstream errStr;
bool msgCall = false;
vector<string> esIpPortList = esObj.getClusterIpPort();
ClusterState csObj;
string selectedNode = esObj.randChoicNode(esIpPortList);
int esCheckRes = esObj.esClusterCheck(csObj, selectedNode, logger);
errStr << "[ metricMonitoring() ERROR ] -> ";
// If a problem appears in the cluster status check
if (esCheckRes == -1) return;
try {
if (csObj.clusterName == "" || csObj.clusterState == "") {
errStr << "[" << csObj.clusterName << " // " << selectedNode << "] clusterName : " << csObj.clusterName << " clusterState : " << csObj.clusterState;
teleBot.sendMsgByBot(errStr.str() ,logger);
throw runtime_error(errStr.str());
} else if (csObj.clusterState == "red") {
errStr << "[" << csObj.clusterName << " // " << selectedNode << "] The status of the ES Cluster is RED.";
teleBot.sendMsgByBot(errStr.str() ,logger);
throw runtime_error(errStr.str());
}
errStr << "cluster name : " << csObj.clusterName;
// ==== TEST CODE ====
logger->info("clusterName.first : " + csObj.clusterName);
logger->info("clusterName.second : " + csObj.clusterState);
// ================================
msgStream << "[ ==== Metric Alert ====]\n" << "[ " << csObj.clusterName << "]\n";
string curTime = getCurrentUTC("%Y.%m.%d");
json diskJson = esObj.getDiskState(selectedNode, curTime, logger);
json cpuJvmJson = esObj.getCpuJvmState(selectedNode, curTime, logger);
auto diskJsonArray = diskJson["aggregations"]["nodes"]["buckets"];
auto cpuJvmArray = cpuJvmJson["aggregations"]["terms"]["buckets"];
for (json elem: diskJsonArray)
{
if (elem["key"].is_null() || elem["total_disk"]["value"].is_null() || elem["available_disk"]["value"].is_null())
{
errStr << "elem: diskJsonArray -> ";
if (elem["key"].is_null()) errStr << "elem['key'] is null ";
if (elem["total_disk"]["value"].is_null()) errStr << "elem['total_disk']['value'] is null ";
if (elem["available_disk"]["value"].is_null()) errStr << "elem['available_disk']['value'] is null ";
throw runtime_error(errStr.str());
}
string nodeIpPort = elem["key"].get<string>();
double totalDisk = elem["total_disk"]["value"].get<double>();
double availDisk = elem["available_disk"]["value"].get<double>();
double usePercent = roundToTwoDecimalPlaces(((totalDisk - availDisk) / totalDisk) * 100.0);
if (usePercent >= 80)
{
msgCall = true;
msgStream << " ip-addr: " << nodeIpPort << "\n disk used : " << usePercent << " %\n";
}
}
for (json elem: cpuJvmArray)
{
if (elem["key"].is_null() || elem["cpu_avg"]["value"].is_null() || elem["heap_avg"]["value"].is_null())
{
errStr << "elem: cpuJvmArray -> ";
if (elem["key"].is_null()) errStr << "elem['key'] is null ";
if (elem["cpu_avg"]["value"].is_null()) errStr << "elem['cpu_avg']['value'] is null ";
if (elem["heap_avg"]["value"].is_null()) errStr << "elem['heap_avg']['value'] is null ";
throw runtime_error(errStr.str());
}
string nodeIpPort = elem["key"].get<string>();
double cpuAvg = roundToTwoDecimalPlaces(elem["cpu_avg"]["value"].get<double>());
double jvmAvg = roundToTwoDecimalPlaces(elem["heap_avg"]["value"].get<double>());
if (cpuAvg >= 90)
{
msgCall = true;
msgStream << " ip-addr: " << nodeIpPort << "\n cpu used : " << cpuAvg << " %\n";
}
if (jvmAvg >= 90)
{
msgCall = true;
msgStream << " ip-addr: " << nodeIpPort << "\n jvm used : " << jvmAvg << " %\n";
}
}
if (msgCall)
{
teleBot.sendMsgByBot(msgStream.str(), logger);
}
}
catch(const json::type_error &e) {
errStr << "node ip - " << selectedNode << " ERROR :: " << e.what();
logger->error(errStr.str());
teleBot.sendMsgByBot(errStr.str(), logger);
} catch(const exception& e) {
errStr << e.what();
logger->error(errStr.str());
}
}
}
/**
* @brief Main function
*
* @return int
*/
int main()
{
string logFilePath = "./metric_log/" + getCurrentTime("%Y.%m.%d") + ".log";
shared_ptr<spdlog::logger> fileLogger = spdlog::basic_logger_mt("file_logger", logFilePath);
json clusterListJson = readJsonFile("./elasticsearch_server/monitoring_cluster.json");
queue<Elastic> esQ = esObjQue(clusterListJson);
json teleInfoJson = readJsonFile("./telegram_info_test.json");
TeleBot teleBot(teleInfoJson["token"].get<string>(), teleInfoJson["id"].get<string>());
// ===== multi-thread =====
// Initialize thread pool
spdlog::init_thread_pool(8192, 1);
logFilePath = "./metric_log/" + getCurrentTime("%Y.%m.%d") + ".log";
// Create an asynchronous logger.
auto async_file_logger = spdlog::basic_logger_mt<spdlog::async_factory>("async_logger", logFilePath);
// Determines how many threads to process.
int threadCnt = 4;
int esListQ = esQ.size() / threadCnt;
int esListR = esQ.size() % threadCnt;
vector<thread> threads;
queue<Elastic> esQcopy(esQ);
while(!esQcopy.empty())
{
vector<Elastic> elasticList;
int curCnt = esListQ;
if (esListR != 0) {
curCnt++;
esListR--;
}
for (int i = 0; i < curCnt; i++)
{
Elastic curEs = esQcopy.front();
esQcopy.pop();
elasticList.push_back(curEs);
}
threads.push_back(thread(metricMonitoring, elasticList, teleBot, async_file_logger));
}
for (auto& t : threads) t.join();
spdlog::drop("async_logger");
return 0;
}
성능검증
그럼 실제로 멀티스레드로 구동하는 프로그램이 싱글스레드로 구동한 프로그램보다
얼마나 빠른지 직접 테스트해보자.
현재 테스트 환경에서는 총 22개의 ES 클러스터가 존재한다.
싱글스레드 프로그래밍으로 작성한 코드와
멀티스레드 프로그래밍으로 작성한 코드를 각각 컴파일하여 구동시간을 비교하였다.
위의 그림에서 보듯, 4개의 멀티스레드를 사용하여 구동한 프로그램이
싱글스레드를 사용한 프로그램보다 약 4배 정도 빠른 것을 볼 수 있다.
이렇게 하면 보다 빠르게 각 ES 클러스터의 지표를 수집할 수 있고,
더 나아가 효율적인 서버 모니터링을 수행할 수 있을 것이라고 생각한다.
'개발 & 구현' 카테고리의 다른 글
[c++] Elasticsearch Cluster metric 알람 구현 (2) | 2023.09.18 |
---|---|
[Python] Telegram 응답 받기 (0) | 2023.05.03 |
[Python] Telegram 메시지 보내주기 (0) | 2023.05.02 |
JAVA 메시지 보내기 - SMTP (0) | 2022.12.03 |
QR코드 로그인 구현 (1) | 2022.05.13 |