목표
회사에서 여러 시스템에서 사용되는 Elasticsearch Cluster를 관리하고 있다.
클러스터 모니터링을 위해서
ElasticHQ, Cerebro, Prometheus, Grafana , Beats와 같은 솔루션을 이용할 수 있겠지만,
이 글에서는 C++ 코딩을 통해서 해당 기능을 수행하는 프로그램을 만들어보겠다.
이 글에서는 특정 ES cluster 내부의 각각 노드의 metric 정보를 수집하여
임계치가 넘어가면, 관리자에게 알람을 전송하는 코드를 만들어보겠다.
수집할 metric 정보는 아래와 같다.
1. CPU 사용량
2. JVM 사용량
3. DISK 사용량
설계
시스템 아키텍처
위의 시스템 아키텍처를 보면,
컴파일된 c++ 프로그램이 각 ES Cluster에 curl HTTP GET Request를 통해
해당 클러스터에 속해있는 각 노드의 CPU, JVM, DISK 사용량을 파악한 뒤
임계치가 넘는 노드가 있으면 Telegram Bot을 통해서
관리자가 속해있는 Telegram 대화방에 메시지를 보내주는 방식이다.
소스 코드 프레임
각 ES Cluster의 접속정보는 server_list 내에 존재하는 json 파일에 쓰여있다.
common.h 헤더파일은 각 소스코드에서 사용할 표준 라이브러리 및 외부 라이브러리가 포함되어 있다.
telegram_info.json 은 아래와 같이 해당 정보를 json 파일로 만들어 주면 된다.
어떤 클러스터를 포함시킬 것인지 정해주는 monitoring_cluster.json 은 아래와 같이 지정할 수 있다.
또한 monitoring_cluster.json 에서 모니터링 대상이 된 각 클러스터의 정보는
아래와 같은 방식으로 json 형태로 저장해 주면 된다.
그리고 각 파일이 의미하는 바는 아래의 소스 코드 부분에서 설명하도록 하겠다.
소스코드
1. common.h
#pragma once
#include <iostream>
#include <iomanip>
#include <chrono>
#include <ctime>
#include <sstream>
#include <cctype>
#include <vector>
#include <fstream>
#include <string>
#include <thread>
#include <map>
#include <utility>
#include <cmath>
#include <random>
#include <queue>
// external
#include <curl/curl.h>
#include <nlohmann/json.hpp>
#include <fmt/core.h>
#include <spdlog/spdlog.h>
#include <spdlog/async.h>
#include <spdlog/sinks/basic_file_sink.h>
#include <cpprest/http_client.h>
#include <cpprest/filestream.h>
#include <cpprest/json.h>
#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>
#include <boost/archive/iterators/insert_linebreaks.hpp>
#include <boost/archive/iterators/ostream_iterator.hpp>
#include <curl/curl.h>
using namespace std;
using fmt::format;
using nlohmann::json;
내부 라이브러리와 외부 라이브러리를 나누어 놓았다.
외부 라이브러리는 위의 코드를 참고해서 직접 설치가 필요하다.
2. clusterState.h / clusterState.cpp
#pragma once
#include "common.h"
using namespace std;
struct ClusterState {
string clusterName; // es cluster 의 이름
string clusterState; // es cluster 의 상태
ClusterState();
ClusterState(const string &clusterName, const string &clusterState);
};
clusterState.h 는 특정 es 클러스터의 상태를 저장해 주기 위한 구조체이다.
구현 부분은 아래와 같다.
#include "clusterState.h"
ClusterState::ClusterState() {}
ClusterState::ClusterState(const string &cName, const string &cState)
: clusterName(cName), clusterState(cState) {
}
그저 생성자의 역할을 하는 코드를 구현한 모습을 볼 수 있다.
3. elastic.h / elastic.cpp
#pragma once
#include "clusterState.h"
#include "common.h"
/**
* @brief Elasticsearch Cluster Object
*
*/
class Elastic {
private:
vector<string> clusterIpPort;
string clusterId;
string clusterPw;
string clusterVer;
bool clusterSsl;
public:
Elastic();
Elastic(vector<string> clusterIpPort, string clusterId, string clusterPw, string clusterVer, bool clusterSsl);
//getter
vector<string> getClusterIpPort();
// method
//1. ES 클러스터의 특정 노드를 임의적으로 선택해준다.
string randChoicNode(const vector<string> &esIpPortList);
//2. curl 명령어를 통해 HTTP request 를 보내기 위한 전처리 과정
CURL* setESCurl(const string &uri, curl_slist* headers, const string &selectedNode, shared_ptr<spdlog::logger> logger);
//3. ES cluster 의 상태를 체크해준다.
int esClusterCheck(ClusterState &csObj, const string &selectedNode, shared_ptr<spdlog::logger> logger);
//4. 특정 클러스터 내의 각 노드들의 CPU , JVM 사용량을 json으로 리턴
json getCpuJvmState(const string &selectedNode, const string &utcTime, shared_ptr<spdlog::logger> logger);
//5. 특정 클러스터 내의 각 노드들의 disk 사용량을 json 으로 리턴
json getDiskState(const string &selectedNode, const string &utcTime, shared_ptr<spdlog::logger> logger);
};
elastic.h 는 직접적으로 elasticsearch cluster에 쿼리를 해줄 수 있도록 해주는
객체를 선언해 놓은 부분이라고 생각하면 된다.
구현부는 아래와 같다.
#include "common.h"
#include "elastic.h"
/**
* @brief Callback function for curl command
*
* @param contents
* @param size
* @param nmemb
* @param userp
* @return size_t
*/
size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp)
{
((string*)userp)->append((char*)contents, size * nmemb);
return size * nmemb;
}
Elastic::Elastic() {}
Elastic::Elastic(vector<string> clusterIpPort, string clusterId, string clusterPw, string clusterVer, bool clusterSsl)
: clusterIpPort(clusterIpPort), clusterId(clusterId), clusterPw(clusterPw),
clusterVer(clusterVer), clusterSsl(clusterSsl) {}
vector<string> Elastic::getClusterIpPort()
{
return clusterIpPort;
}
/**
* @brief Selects a specific node in the ES cluster.
*
* @param esIpPortList
* @return string
*/
string Elastic::randChoicNode(const vector<string> &esIpPortList)
{
mt19937 engine(random_device{}());
int clusterSize = esIpPortList.size();
uniform_int_distribution<int> distrib(0, clusterSize - 1);
return esIpPortList[distrib(engine)];
}
/**
* @brief Function that creates a CURL object to connect to the ES cluster.
*
* @param uri
* @param headers
* @param selectedNode
* @param logger
* @return CURL*
*/
CURL* Elastic::setESCurl(const string &uri, curl_slist* headers, const string &selectedNode, shared_ptr<spdlog::logger> logger)
{
CURL* curl;
curl = curl_easy_init();
stringstream html;
stringstream errStr;
errStr << "[ setESCurl() ERROR ] -> ";
// Depends on whether "SEARCH GUARD" is applied or not
if (this->clusterSsl) html << "https://" << selectedNode;
else html << "http://" << selectedNode;
html << uri;
try {
// Setup the URL
curl_easy_setopt(curl, CURLOPT_URL, html.str().c_str());
if (this->clusterId != "" && this->clusterPw != "")
{
curl_easy_setopt(curl, CURLOPT_HTTPAUTH, (long)CURLAUTH_BASIC);
curl_easy_setopt(curl, CURLOPT_USERNAME, this->clusterId.c_str());
curl_easy_setopt(curl, CURLOPT_PASSWORD, this->clusterPw.c_str());
}
// Disable SSL certificate verification
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 0L); // 0L means no verify
curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 0L); // 0L means no verify
// Set maximum time allowed for the transfer
curl_easy_setopt(curl, CURLOPT_TIMEOUT, 30L);
// Set maximum time the request is allowed to take to connect
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 15L);
// Specify the GET method
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET");
headers = curl_slist_append(headers, "Content-Type: application/json");
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
} catch(const json::type_error &e) {
errStr << "node ip - " << selectedNode << " ERROR :: " << e.what();
logger->error(errStr.str());
} catch(const exception& e) {
errStr << e.what();
logger->error(errStr.str());
}
return curl;
}
/**
* @brief Function that checks the status of the ES cluster and finds the name of the cluster.
*
* @param csObj
* @param selectedNode
* @param logger
* @return int
*/
int Elastic::esClusterCheck(ClusterState &csObj, const string &selectedNode, shared_ptr<spdlog::logger> logger)
{
struct curl_slist* headers = nullptr;
CURL* curl = setESCurl("/_cluster/health", headers, selectedNode, logger);
CURLcode res;
string readBuffer;
stringstream errStr;
int returnVal = 0;
errStr << "[ esClusterCheck() ERROR ] -> ";
if (curl)
{
try {
// Set the callback function
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
res = curl_easy_perform(curl);
if(res == CURLE_OK)
{
json jsonObj = json::parse(readBuffer);
/*Even if there is no problem with communication, there may be a problem with the response itself,
so the corresponding exception is processed.*/
if (jsonObj["error"]["reason"].is_null())
{
if (!(jsonObj["cluster_name"].is_null() && jsonObj["status"].is_null())) {
csObj.clusterName = jsonObj["cluster_name"].get<string>();
csObj.clusterState = jsonObj["status"].get<string>();
} else {
errStr << "node ip - " << selectedNode << " : csObj.clusterName or csObj.clusterState is null value";
returnVal = -1;
throw runtime_error(errStr.str());
}
}
else
{
errStr << "node ip - " << selectedNode << " : " << jsonObj["error"]["reason"].get<string>();
returnVal = -1;
throw runtime_error(errStr.str());
}
}
else
{
errStr << "[" << selectedNode << "] : " << "curl_easy_perform() failed: " << curl_easy_strerror(res);
returnVal = -1;
throw runtime_error(errStr.str());
}
} catch(const json::type_error &e) {
errStr << "node ip - " << selectedNode << " ERROR :: " << e.what();
logger->error(e.what());
} catch(const exception& e) {
errStr << e.what();
logger->error(errStr.str());
}
// Clean up
if (headers) curl_slist_free_all(headers);
curl_easy_cleanup(curl);
}
return returnVal;
}
/**
* @brief Get the Cpu Jvm State object
*
* @param selectedNode
* @param utcTime
* @param logger
* @return json
*/
json Elastic::getCpuJvmState(const string &selectedNode, const string &utcTime, shared_ptr<spdlog::logger> logger)
{
stringstream uri;
uri << "/.monitoring-es-" << this->clusterVer << "-" << utcTime << "/_search";
struct curl_slist* headers = nullptr;
CURL* curl = setESCurl(uri.str(), headers, selectedNode, logger);
CURLcode res;
string readBuffer;
json jsonObj;
stringstream errStr;
errStr << "[ getCpuJvmState() ERROR ] -> ";
if (curl)
{
try {
// Set the callback function
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
string jsonPayload = R"(
{
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
"gte": "now-120s",
"lte": "now"
}
}
}
]
}
},
"_source": ["aggregations"],
"aggs": {
"terms": {
"terms": {
"field": "source_node.transport_address",
"size": 50
},
"aggs": {
"heap_avg": {
"avg": {
"field": "node_stats.jvm.mem.heap_used_percent"
}
},
"cpu_avg": {
"avg": {
"field": "node_stats.process.cpu.percent"
}
},
"node_name": {
"terms": {
"field": "source_node.name",
"size": 50
}
}
}
}
}
})";
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, jsonPayload.c_str());
res = curl_easy_perform(curl);
if (res == CURLE_OK)
{
jsonObj = json::parse(readBuffer);
/**
Even if there is no problem with communication, there may be a problem with the response itself,
so the corresponding exception is processed.*/
if (!jsonObj["error"]["reason"].is_null())
{
errStr << "node ip - " << selectedNode << " : " << jsonObj["error"]["reason"].get<string>();
jsonObj = nullptr;
throw runtime_error(errStr.str());
}
}
else
{
errStr << "[" << selectedNode << "] : " << "curl_easy_perform() failed: " << curl_easy_strerror(res);
throw runtime_error(errStr.str());
}
// Syntax for handling exceptions when json ERROR keeps occurring
} catch(const json::type_error &e) {
errStr << "node ip - " << selectedNode << " ERROR :: " << e.what();
logger->error(e.what());
} catch(const exception& e) {
errStr << e.what();
logger->error(errStr.str());
}
// Clean up
if (headers) curl_slist_free_all(headers);
curl_easy_cleanup(curl);
}
return jsonObj;
}
/**
* @brief Get the Disk State object
*
* @param selectedNode
* @param utcTime
* @param logger
* @return json
*/
json Elastic::getDiskState(const string &selectedNode, const string &utcTime, shared_ptr<spdlog::logger> logger)
{
stringstream uri;
uri << "/.monitoring-es-" << this->clusterVer << "-" << utcTime << "/_search";
struct curl_slist* headers = nullptr;
CURL* curl = setESCurl(uri.str(), headers, selectedNode, logger);
CURLcode res;
string readBuffer;
json jsonObj;
stringstream errStr;
errStr << "[ getDiskState() ERROR ] -> ";
if (curl)
{
try {
// Set the callback function
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback);
curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer);
string jsonPayload = R"(
{
"query": {
"bool": {
"filter": [
{
"range": {
"timestamp": {
"gte": "now-40s",
"lte": "now"
}
}
}
]
}
},
"_source": ["aggregations"],
"aggs": {
"nodes": {
"terms": {
"field": "source_node.transport_address",
"size": 100
},
"aggs": {
"total_disk": {
"max": {
"field": "node_stats.fs.total.total_in_bytes"
}
},
"available_disk": {
"min": {
"field": "node_stats.fs.total.available_in_bytes"
}
}
}
}
}
})";
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, jsonPayload.c_str());
res = curl_easy_perform(curl);
if (res == CURLE_OK)
{
jsonObj = json::parse(readBuffer);
/**
Even if there is no problem with communication, there may be a problem with the response itself,
so the corresponding exception is processed.*/
if (!jsonObj["error"]["reason"].is_null())
{
errStr << "node ip - " << selectedNode << " : " << jsonObj["error"]["reason"].get<string>();
throw runtime_error(errStr.str());
}
}
else
{
errStr << "[" << selectedNode << "] : " << "curl_easy_perform() failed: " << curl_easy_strerror(res);
throw runtime_error(errStr.str());
}
} catch(const json::type_error &e) {
errStr << "node ip - " << selectedNode << " ERROR :: " << e.what();
logger->error(e.what());
} catch(const exception& e) {
errStr << e.what();
logger->error(errStr.str());
}
// Clean up
if (headers) curl_slist_free_all(headers);
curl_easy_cleanup(curl);
}
return jsonObj;
}
ES 쿼리 부분을 유심하게 보면,
구하고자 하는 지표값의 특정 기간 내의 집계값을 가져오는 것을 볼 수 있다.
DISK를 저장매체로 사용하는 모든 디비들은 디스크 용량상태가 가장 중요하다.
그러므로 DISK 지표값은 term을 적게 지정하고,
집계값도 평균값이 아닌,
남은 용량의 최솟값, 사용용량의 최댓값을 집계해주고 있다.
jvm과 cpu 사용률은 disk 지표값보다 중요하지 않으므로
기간은 약 120초 정도로 잡아두었다.
4. teleBot.h / teleBot.cpp
#pragma once
#include "common.h"
class TeleBot
{
private:
string teleToken;
string teleId;
public:
TeleBot(string teleToken, string teleId);
// 메시지를 보내주는 메소드
bool sendMsgByBot(const string &msg, shared_ptr<spdlog::logger> logger);
};
ES cluster curl 명령어를 통해서 각 지표의 상태를 체크한 다음
문제가 있을 경우에 Telegram bot을 통해서 메시지를 보내야 한다.
메시지를 보내기 위한 Telegram Bot 객체라고 보면 된다.
구현 부분은 아래와 같다.
#include "common.h"
#include "teleBot.h"
TeleBot::TeleBot(string teleToken, string teleId)
: teleToken(teleToken), teleId(teleId) {}
/**
* @brief Function that sends a message through a Telegram bot
*
* @param msg
* @param logger
* @return true
* @return false
*/
bool TeleBot::sendMsgByBot(const string &msg, shared_ptr<spdlog::logger> logger)
{
using namespace web::http;
using namespace web::http::client;
using namespace web::json;
try {
http_request req(methods::POST);
req.set_request_uri(U("/bot") + teleToken + U("/sendMessage"));
value payload;
payload[U("chat_id")] = value::string(teleId);
payload[U("text")] = value::string(msg);
payload[U("parse_mode")] = value::string(U("html"));
req.headers().set_content_type(U("application/json"));
req.set_body(payload);
http_client client(U("https://api.telegram.org/"));
auto resp = client.request(req).get();
if (resp.status_code() != status_codes::OK) {
throw runtime_error("Failed to send message via Telegram API -> Message not delivered: " + msg);
}
logger->info("Telegram bot successfully delivered message.");
return true;
} catch (const exception& e) {
logger->error(e.what());
return false;
}
}
5. main.cpp
#include "common.h"
#include "clusterState.h"
#include "elastic.h"
#include "teleBot.h"
/**
* @brief Function to read json file
*
* @param filepath
* @return json
*/
json readJsonFile(const string& filepath) {
//test
auto logger = spdlog::get("file_logger");
ifstream ifs(filepath);
json jsonData;
// If the file was not opened properly, print an error message and return an empty json object.
if (!ifs.is_open()) {
logger->error("Failed to open the JSON file: {}", filepath);
} else {
try {
jsonData = json::parse(ifs);
} catch (const exception& e) {
stringstream ss;
ss << "filepath : " << filepath << " ERROR : " << e.what();
logger->error(ss.str());
}
}
ifs.close();
return jsonData;
}
/**
* @brief Function that displays only two decimal places - performs rounding at the third decimal place
*
* @param value
* @return double
*/
double roundToTwoDecimalPlaces(double value) {
return round(value * 100) / 100.0;
}
/**
* @brief Get the Current UTC object
*
* @param format
* @return string
*/
string getCurrentUTC(const string &format) {
using namespace chrono;
// Get current time from system clock
auto now = system_clock::now();
// Convert time to time_t format
time_t now_time_t = system_clock::to_time_t(now);
// Convert to GMT/UTC time
tm* now_utc_tm = gmtime(&now_time_t);
// Convert time to string ex) "%Y-%m-%d %H:%M:%S"
stringstream ss;
ss << put_time(now_utc_tm, format.c_str());
return ss.str();
}
/**
* @brief Get the Current Time object - Local time
*
* @param format
* @return string
*/
string getCurrentTime(const string &format)
{
using namespace chrono;
// Get current time from system clock
auto now = system_clock::now();
// Convert time to time_t format
time_t now_time_t = system_clock::to_time_t(now);
tm* now_kor_tm = localtime(&now_time_t);
// Convert time to string ex) "%Y-%m-%d %H:%M:%S"
stringstream ss;
ss << put_time(now_kor_tm, format.c_str());
return ss.str();
}
/**
* @brief
*
* @param file
* @return vector<Elastic>
*/
vector<Elastic> esObjList(const json &file) {
auto logger = spdlog::get("file_logger");
vector<string> infoList;
vector<Elastic> clusterList;
try {
for (const auto& elem : file)
{
stringstream clusterPath;
clusterPath << "./elasticsearch_server/" << elem["cluster_path"].get<string>();
infoList.push_back(clusterPath.str());
}
for (const string &path : infoList)
{
json data = readJsonFile(path);
vector<string> ipAddr;
string esId, esPw;
bool sslYn = true;
for (const string &elem : data["cluster_ip_port"]) ipAddr.push_back(elem);
if (data["ssl"].is_null()) sslYn = false;
esId = data["id"].is_null() ? "" : data["id"].get<string>();
esPw = data["pw"].is_null() ? "" : data["pw"].get<string>();
Elastic esObj(ipAddr, esId, esPw, data["ver"].get<string>(), sslYn);
clusterList.push_back(esObj);
}
} catch (const exception& e) {
logger->error(e.what());
}
return clusterList;
}
/**
* @brief Function to monitor metrics of ES cluster and
*
* @param esObj
* @param teleBot
* @param logger
*/
void metricMonitoring(Elastic esObj, TeleBot teleBot, shared_ptr<spdlog::logger> logger)
{
stringstream msgStream;
stringstream errStr;
bool msgCall = false;
vector<string> esIpPortList = esObj.getClusterIpPort();
ClusterState csObj;
string selectedNode = esObj.randChoicNode(esIpPortList);
int esCheckRes = esObj.esClusterCheck(csObj, selectedNode, logger);
errStr << "[ metricMonitoring() ERROR ] -> ";
// If a problem appears in the cluster status check
if (esCheckRes == -1) return;
try {
if (csObj.clusterName == "" || csObj.clusterState == "") {
errStr << "[" << csObj.clusterName << " // " << selectedNode << "] clusterName : " << csObj.clusterName << " clusterState : " << csObj.clusterState;
teleBot.sendMsgByBot(errStr.str() ,logger);
throw runtime_error(errStr.str());
} else if (csObj.clusterState == "red") {
errStr << "[" << csObj.clusterName << " // " << selectedNode << "] The status of the ES Cluster is RED.";
teleBot.sendMsgByBot(errStr.str() ,logger);
throw runtime_error(errStr.str());
}
errStr << "cluster name : " << csObj.clusterName;
// ==== test code ====
logger->info("clusterName.first : " + csObj.clusterName);
logger->info("clusterName.second : " + csObj.clusterState);
// ================================
msgStream << "[ ==== Metric Alert ====]\n" << "[ " << csObj.clusterName << "]\n";
string curTime = getCurrentUTC("%Y.%m.%d");
json diskJson = esObj.getDiskState(selectedNode, curTime, logger);
json cpuJvmJson = esObj.getCpuJvmState(selectedNode, curTime, logger);
auto diskJsonArray = diskJson["aggregations"]["nodes"]["buckets"];
auto cpuJvmArray = cpuJvmJson["aggregations"]["terms"]["buckets"];
bool continueFlag = true;
for (json elem: diskJsonArray)
{
if (elem["key"].is_null() || elem["total_disk"]["value"].is_null() || elem["available_disk"]["value"].is_null())
{
errStr << "elem: diskJsonArray -> ";
if (elem["key"].is_null()) errStr << "elem['key'] is null ";
if (elem["total_disk"]["value"].is_null()) errStr << "elem['total_disk']['value'] is null ";
if (elem["available_disk"]["value"].is_null()) errStr << "elem['available_disk']['value'] is null ";
throw runtime_error(errStr.str());
}
string nodeIpPort = elem["key"].get<string>();
double totalDisk = elem["total_disk"]["value"].get<double>();
double availDisk = elem["available_disk"]["value"].get<double>();
double usePercent = roundToTwoDecimalPlaces(((totalDisk - availDisk) / totalDisk) * 100.0);
if (usePercent >= 10)
{
msgCall = true;
msgStream << " ip-addr: " << nodeIpPort << "\n disk used : " << usePercent << " %\n";
}
}
for (json elem: cpuJvmArray)
{
if (elem["key"].is_null() || elem["cpu_avg"]["value"].is_null() || elem["heap_avg"]["value"].is_null())
{
errStr << "elem: cpuJvmArray -> ";
if (elem["key"].is_null()) errStr << "elem['key'] is null ";
if (elem["cpu_avg"]["value"].is_null()) errStr << "elem['cpu_avg']['value'] is null ";
if (elem["heap_avg"]["value"].is_null()) errStr << "elem['heap_avg']['value'] is null ";
throw runtime_error(errStr.str());
}
string nodeIpPort = elem["key"].get<string>();
double cpuAvg = roundToTwoDecimalPlaces(elem["cpu_avg"]["value"].get<double>());
double jvmAvg = roundToTwoDecimalPlaces(elem["heap_avg"]["value"].get<double>());
if (cpuAvg >= 10)
{
msgCall = true;
msgStream << " ip-addr: " << nodeIpPort << "\n cpu used : " << cpuAvg << " %\n";
}
if (jvmAvg >= 10)
{
msgCall = true;
msgStream << " ip-addr: " << nodeIpPort << "\n jvm used : " << jvmAvg << " %\n";
}
}
if (msgCall)
{
teleBot.sendMsgByBot(msgStream.str(), logger);
}
}
catch(const json::type_error &e) {
errStr << "node ip - " << selectedNode << " ERROR :: " << e.what();
logger->error(errStr.str());
teleBot.sendMsgByBot(errStr.str(), logger);
} catch(const exception& e) {
errStr << e.what();
logger->error(errStr.str());
}
}
/**
* @brief Main function
*
* @return int
*/
int main()
{
// Code for performance testing
auto start = chrono::high_resolution_clock::now();
string logFilePath = "./metric_log/" + getCurrentTime("%Y.%m.%d") + ".log";
shared_ptr<spdlog::logger> fileLogger = spdlog::basic_logger_mt("file_logger", logFilePath);
json clusterListJson = readJsonFile("./elasticsearch_server/monitoring_cluster.json");
vector<Elastic> esList = esObjList(clusterListJson);
json teleInfoJson = readJsonFile("./telegram_info.json");
TeleBot teleBot(teleInfoJson["token"].get<string>(), teleInfoJson["id"].get<string>());
logFilePath = "./metric_log/" + getCurrentTime("%Y.%m.%d") + ".log";
for (int i = 0; i < esList.size(); i++) metricMonitoring(esList[i], teleBot, fileLogger);
return 0;
}
main.cpp 소스 코드를 보면, json 파일로 지정된 ES cluster의 정보와
Telegram Bot의 정보를 읽어와서, 각 ES cluster의 cpu, jvm, disk 정보를 수집하고 있다.
해당 system에 이슈가 하나 존재했는데,
DISK 지표값은 term을 너무 적게 지정하는 경우에는 리턴되는 json 값이 null 이 되어
시스템의 에러를 야기했다.
아마도 해당 코드가 ES의 시스템 인덱스인 ".monitoring-" 인덱스를 참조하여
지표값을 수집하기 때문에, 짧은 기간 내의 지표값을 수집하려고 하면
아직 수집되지 않은 지표값을 수집하면서 해당현상이 발생되는 것으로 사료된다.
여러 시행착오 끝에 DISK 수집 term 은 40초 정도가 무난하다고 판단된다.
그리고 현재는, 모니터링 알람이 오는지 안 오는지 확인해야 하므로
각 지표값이 10%가 넘어가면 알람을 보내주는 방식으로 코드를 구현하였다.
컴파일을 수행해 준 다음 코드를 실행시켜 보자.
아래와 같이 정상적으로 메시지가 오는 것을 볼 수 있고
로깅도 문제없이 되는 걸 확인할 수 있다.
'개발 & 구현' 카테고리의 다른 글
[c++] Elasticsearch Cluster metric 알람 구현 - 최적화 (0) | 2023.09.19 |
---|---|
[Python] Telegram 응답 받기 (0) | 2023.05.03 |
[Python] Telegram 메시지 보내주기 (0) | 2023.05.02 |
JAVA 메시지 보내기 - SMTP (0) | 2022.12.03 |
QR코드 로그인 구현 (1) | 2022.05.13 |