智能运维预警实践:基于机器学习的时序指标异常检测与监控落地
智能运维预警实践基于机器学习的时序指标异常检测与监控落地一、为什么要用机器学习做异常检测1.1 传统阈值的问题# 传统的阈值告警 — 只能检测绝对值超标 - alert: HighCPU expr: node_cpu_seconds_total{modeidle} 10 for: 5m这个规则的局限无法检测缓慢变化——CPU从20%用2小时爬到95%中间无数次正常无法识别周期性——业务高峰期的80%可能正常凌晨的30%可能异常无法交叉关联——单指标异常可能是正常的业务波动1.2 机器学习方案的优势能力维度固定阈值动态基线(统计)机器学习缓慢趋势检测❌部分✅周期性自适应❌弱✅节假日感知❌❌✅多指标关联❌❌✅异常分级❌部分✅冷启动能力✅部分❌(需3天数据)二、落地方案基于Prophet 规则引擎的混合方案在实践中纯机器学习方案有冷启动和可解释性的问题纯固定阈值又不够智能。我们采用了机器学习为主、规则为辅的混合方案。2.1 整体架构flowchart TD A[Prometheus Metrics] -- B[特征工程] B -- C[Prophet模型] C -- D[异常评分] D -- E[规则引擎] F[固定阈值规则] -- E E -- G[异常事件] G -- H[告警决策]2.2 特征工程# feature_engineering.py — 时序特征工程 import numpy as np import pandas as pd from prometheus_api_client import PrometheusConnect class TimeSeriesFeatureEngine: 时序特征工程 def __init__(self): self.prom PrometheusConnect(urlhttp://prometheus:9090, disable_sslTrue) def extract_features(self, service: str, metric_pattern: str, lookback_hours: int 168) - pd.DataFrame: 提取时序特征 features [] for metric_name in [f{metric_pattern}_{s} for s in [qps, latency, error_rate]]: data self.prom.custom_query_range( querymetric_name, start_timepd.Timestamp.now() - pd.Timedelta(hourslookback_hours), end_timepd.Timestamp.now(), step60s ) if not data: continue values pd.Series([float(v[1]) for v in data[0][values]]) timestamps pd.DatetimeIndex([pd.Timestamp.fromtimestamp(v[0]) for v in data[0][values]]) # 基础统计特征 features.append({ metric: metric_name, mean: values.mean(), std: values.std(), min: values.min(), max: values.max(), p50: values.quantile(0.5), p95: values.quantile(0.95), p99: values.quantile(0.99), skew: values.skew(), kurtosis: values.kurtosis(), }) # 时序特征 features[-1].update({ hour_of_day: timestamps.hour.mean(), day_of_week: timestamps.dayofweek.mean(), is_weekend: (timestamps.dayofweek 5).mean(), }) return pd.DataFrame(features)2.3 Prophet异常检测模型# prophet_detector.py — Prophet异常检测器 from prophet import Prophet import pandas as pd import numpy as np from datetime import datetime, timedelta import logging logging.getLogger(prophet).setLevel(logging.WARNING) class ProphetDetector: 基于Prophet的异常检测器 def __init__(self): self.models {} self.config { interval_width: 0.99, changepoint_prior_scale: 0.02, seasonality_prior_scale: 10.0, weekly_seasonality: True, daily_seasonality: True, yearly_seasonality: False, } def train(self, metric_name: str, df: pd.DataFrame): 训练Prophet模型 model Prophet( interval_widthself.config[interval_width], changepoint_prior_scaleself.config[changepoint_prior_scale], seasonality_prior_scaleself.config[seasonality_prior_scale], weekly_seasonalityself.config[weekly_seasonality], daily_seasonalityself.config[daily_seasonality], yearly_seasonalityself.config[yearly_seasonality] ) # 添加中国节假日效应 model.add_country_holidays(country_nameCN) model.fit(df.rename(columns{timestamp: ds, value: y})) self.models[metric_name] model def detect(self, metric_name: str, current_value: float, timestamp: datetime) - dict: 检测当前值是否异常 model self.models.get(metric_name) if not model: return {is_anomaly: False, error: Model not trained} # 预测当前时间点 future model.make_future_dataframe(periods0, include_historyFalse) forecast model.predict(future) # 取最近的预测结果 latest_forecast forecast.iloc[-1] # 判断是否超出置信区间 lower latest_forecast[yhat_lower] upper latest_forecast[yhat_upper] prediction latest_forecast[yhat] is_anomaly current_value lower or current_value upper # 计算异常程度 if is_anomaly: if current_value upper: deviation (current_value - upper) / (upper - prediction) * 100 else: deviation (lower - current_value) / (prediction - lower) * 100 # 分级 if deviation 200: severity critical elif deviation 100: severity warning else: severity info else: deviation 0 severity normal return { is_anomaly: is_anomaly, severity: severity, current_value: current_value, prediction: prediction, lower_bound: lower, upper_bound: upper, deviation_pct: deviation, timestamp: timestamp.isoformat() }2.4 规则引擎补充机器学习的盲区# rule_engine.py — 规则引擎 class RuleEngine: 规则引擎补充ML检测的盲区 def __init__(self): self.rules [ # 硬阈值规则ML检测不到的情况 {name: disk_full, expr: lambda v, m: m disk_usage and v 95}, {name: zero_qps, expr: lambda v, m: m qps and v 0}, # 变化率规则 {name: sudden_drop, expr: lambda v, m, delta: m in [qps, connections] and delta -0.8}, # 突降80% {name: rapid_growth, expr: lambda v, m, delta: m in [latency, error_rate] and delta 3}, # 突增3倍 ] def evaluate(self, metric_name: str, current_value: float, previous_value: float None) - list: 评估规则 triggered [] for rule in self.rules: try: if delta in rule[expr].__code__.co_varnames: # 需要变化率 delta (current_value - previous_value) / previous_value if previous_value else 0 if rule[expr](current_value, metric_name, delta): triggered.append(rule[name]) else: if rule[expr](current_value, metric_name): triggered.append(rule[name]) except: continue return triggered2.5 告警决策融合ML和规则# alert_decision.py — 告警决策融合 class AlertDecisionEngine: 融合ML和规则的告警决策引擎 def decide(self, ml_result: dict, rule_results: list) - dict: 综合决策 if ml_result.get(severity) critical or disk_full in rule_results: return {level: P0, action: immediate_notify, channels: [phone, dingtalk]} elif ml_result.get(severity) warning or rule_results: # 白天立即通知夜间聚合为日报 now datetime.now() if 8 now.hour 22: return {level: P1, action: immediate_notify, channels: [dingtalk]} else: return {level: P2, action: aggregate, channels: [morning_report]} elif ml_result.get(severity) info: return {level: P3, action: log_only, channels: []} else: return {level: normal, action: none, channels: []}三、生产落地效果指标使用前使用后提升提前发现故障平均0min23min∞误报率35%12%66%漏报率22%5%77%MTTR35min18min49%运维满意度5.5/108.2/1049%最有价值的改进是提前发现故障——ML检测到的缓慢趋势异常平均比阈值告警提前23分钟。结语把机器学习引入时序异常检测不是要替代现有的阈值告警体系而是在它之上加一层智能感知。阈值告警做最后一道防线ML做提前预警——两者结合让运维从被动响应走向主动预防。部署ML异常检测一年后我们的P0故障数量下降了60%。不是因为系统更稳定了而是因为很多P0故障在P2阶段就被ML捕捉到了提前处理了。