作者来自 Elastic Valeriy Khakhutskyy了解如何将基于 scroll 的 datafeeds 切换为基于 aggregation 的 datafeeds以优化大规模部署中的机器学习任务。无缝连接领先的 AI 和机器学习平台。开始免费云试用探索 Elastic 的 gen AI 能力或立即在你的机器上试用。几乎在我参与过的每一个大型 Elastic 部署中总会有一个 Elastic Security 或 Elastic Observability 的异常检测 AD 作业看起来运行正常但却始终处于“落后状态”。落后六小时、十二小时而这个差距始终无法缩小。datafeed 并没有损坏。它在按设计正常工作在每次运行时读取每一条原始文档跨越所有分片。在一个大型集群中如果使用跨集群搜索 CCS 以及类似 logs-* 这样范围很广的索引模式这意味着每个 bucket 都要扫描数十亿条文档。没有任何硬件能够让这种方式持续可用。datafeed 永远在追赶实时数据却永远无法追上。解决方案是将默认的基于 scroll 的 datafeed 配置切换为基于聚合 aggregation 的 datafeed 配置让数据节点在本地完成汇总只将紧凑的 bucket 结果发送到 ML 节点。相同的检测逻辑但负载只是其中的一小部分。性能提升可能非常显著甚至超出预期。具体数值将在下一部分给出而这种巨大差距背后的原因则会在文章末尾解释供希望理解机制的读者参考。有一个需要提前注意的点这种切换需要创建一个新的作业。旧模型无法迁移已经学习了数周的基线会丢失。因此最佳切换时机是在作业运行数月之前而不是之后。这也是为什么在部署之前阅读这篇内容很重要。有多快ML 作业中 - scroll vs. aggregation datafeeds我们在生产数据上用两种方式运行了同一个作业先是基于 scroll 的方式然后是基于 aggregation 的方式。该作业覆盖 13 个月的历史数据在多个集群中按 15 分钟桶监控每小时 836,000 条日志事件。在历史数据训练阶段使用 scroll 配置耗时 5 天实际墙钟时间、790 万次顺序请求、传输 3.5 TB 数据而使用 aggregation 配置2.3 分钟、23 次请求、34 MB提升 3,374 倍。可以这样理解如果你在周一上午 9 点启动 scroll 回填它会在周六早上完成而 aggregation 版本在 9:02 就已经完成。在实时数据上这种差异没有那么极端但仍然很显著每个 tick 的请求数量减少约20 倍。在 datafeed 每隔几分钟持续运行的情况下这种节省会迅速累积。开始之前在进入配置前有三点需要了解。这不是 “向导式” 操作。标准 Kibana 作业向导Single Metric、Multi-Metric、Population不支持 aggregation 配置。要创建基于 aggregation 的作业你需要使用 Elasticsearch API或者使用 Kibana 的 Advanced Job Wizard并手动编辑 JSON。下面的示例提供了最实用的路径在 Multi-Metric Wizard 中配置作业然后点击Convert to advanced job再创建作业。这样你会得到一个已填充的 JSON 起点而不是空白编辑器。该配置非常严格而且大多不会报错。没有 schema 校验来捕捉 aggregation key 拼写错误或 fixed_interval 与 bucket_span 不匹配的情况。作业会正常运行anomalies 会触发但不会有任何迹象表明结果基于错误的数据。这也是为什么存在“五步模式”以及为什么每次都应该使用Preview标签页在训练前发现配置错误只需要 30 秒而一周后才发现则会糟糕得多。Single Metric Viewer 对聚合作业有一个已知限制。该视图通过重新查询索引来重建 “实际值” 曲线但无法复现任意用户自定义的 aggregation因此 actual-value 线通常缺失或只是近似值。Anomaly Explorer 不受影响anomaly scores、swim lanes 和 influencer attribution 都能正常工作。只是不要依赖 Single Metric Viewer 的图表来验证模型看到的数据。我们可以和不能聚合的内容几乎所有 ML 函数都可以与基于 aggregation 的 datafeeds 配合使用但合适的 aggregation 模式取决于具体函数。函数模式count,mean,high_mean,low_mean,sum,max,min,varp标准date_histogram→terms→ metric aggregationtime_of_day,time_of_week最小模式仅date_histogram不需要terms或 metricrare,freq_rare,info_content复合模式顶层 composite以date_histogram作为 sourcecategorization对 categorization 字段的.keyword子字段做termslat_long仅 scrolllat_long 是唯一真正的例外。该配置虽然是被允许的但 geo_centroid 会计算一个 bucket 内所有坐标的算术平均值如果同一个实体在同一个 bucket 内同时出现在纽约和伦敦那么质心最终会落在大西洋中间这在使用场景中通常没有意义。因此lat_long 类型的作业应保持使用基于 scroll 的 datafeeds。下一节中的五步模式适用于标准情况。我们将在文章末尾再讲解其余模式。标准五步模式从 scroll-based 到 aggregation datafeed将任何基于 scroll 的作业转换为基于 aggregation 的 datafeed 遵循相同的五个步骤。一旦理解了这个模式将其应用到任何兼容作业通常只需要大约 10 分钟。步骤 1在 analysis 配置中添加 summary_count_field_name: doc_count。这会告诉 ML 引擎输入数据已经是预先汇总的。如果不这样做引擎会把每个聚合后的 bucket 当作一条原始文档从而导致错误的异常评分。步骤 2选择 bucket 包装拓扑结构。对于大多数函数count、mean、sum、max、min、varp、time_of_day、time_of_week 以及 categorization在最顶层使用 date_histogram并确保 fixed_interval 与 bucket_span 完全一致以保证分析结果正确。对于 rare、freq_rare 和 info_content则在最顶层使用 composite并将 date_histogram 作为其 source 之一。这会将 datafeed 路由到 composite extractor使其能够遍历所有字段值组合而不是截断为 top-N。步骤 3在 timestamp 上添加 max 聚合。ML 引擎需要它来确定每个 bucket 的精确结束时间。在标准拓扑步骤 2 的 date_histogram 外层中它位于 histogram 的 aggregations 内部在 composite 拓扑中它与 composite 聚合并列。步骤 4将每个分析字段映射为 terms 聚合并且名称必须与 analysis 配置中的字段名完全一致。一个分类字段 → 一个嵌套 terms两个或多个分类字段 → 在 date_histogram 内嵌套 composite 聚合并且每个字段对应一个 terms source。对于 categorization 作业在 categorization_field_name 的 .keyword 子字段上使用 terms 聚合。命名规则是严格的聚合 key 必须与 analysis 配置中的字段名完全一致ML 引擎依赖的是聚合名称而不是 field 参数来查找值。任何不匹配都会导致静默错误——作业看似正常运行但关键数据全部缺失。步骤 5将每个 detector 的 metric 字段映射为其对应的 Elasticsearch 聚合ML functionElasticsearch aggregationmean/high_mean/low_meanavgsumsummaxmaxminminvarpextended_stats对于 count、rare、freq_rare、info_content、time_of_day、time_of_week 以及 categorization 作业ML 引擎只基于 doc_count 工作不需要 metric 聚合因此这一步骤可以跳过。逐步示例在 Kibana 中构建基于 aggregation 的 ML 作业让我们用 Kibana 的 sample web logs 从头构建一个完整流程。如果你还没有加载这些数据可以进入 Kibana 首页点击Integrations→Sample data→Sample web logs→Add data。这会提供一个名为 Kibana Sample Data Logs 的 data view以及一个名为 kibana_sample_data_logs 的 index其中包含 timestamp、bytes响应大小以及 geo.dest目标国家等字段。我们将构建一个用于检测异常大响应大小的作业对 bytes 使用 high_mean并按目标国家 geo.dest 进行分区bucket span 为 1 小时。使用 Multi-Metric Wizard 创建作业这是实际中创建大多数作业的方式。进入Machine Learning→Anomaly Detection→Manage Jobs→Create job。选择 “Kibana Sample Data Logs” data view并将时间范围设置为覆盖整个 sample dataset。在 job type 页面中选择Multi-metric。在 Multi-Metric Wizard 中配置 detector对 bytes 计算high mean按geo.dest进行数据拆分bucket span1h给这个作业设置一个 ID其余所有配置保持默认但暂时不要点击 Create。在最后一步配置页面中点击Preview JSON并查看 datafeed 部分。你会看到的是一个普通的基于 scroll 的 datafeed没有任何 aggregations只有一个 index pattern 和一个 match_all 查询。这是所有向导默认生成的配置。在小型集群上它运行良好。但在大型集群中配合 CCS 和宽泛的索引模式时这种 datafeed 会在每次运行时扫描所有原始文档并且永远无法追上实时数据。不要点击Create而是点击Convert to advanced job。这样会保留你刚刚配置的所有内容detector、partition 字段、bucket span并直接进入 Advanced Wizard在那里我们可以应用五步模式。分析配置转换后会预填 detector、partition 字段以及 bucket span。这里唯一需要做的修改是模式中的第 1 步打开 Edit JSON 视图并添加 summary_count_field_name告诉 ML 引擎输入数据已经是预先汇总的{ bucket_span: 1h, summary_count_field_name: doc_count, // Step 1 detectors: [ { function: high_mean, field_name: bytes, partition_field_name: geo.dest } ], influencers: [geo.dest] }datafeed 配置切换到Datafeed标签页。这里会汇总模式中的步骤 2 到步骤 5。删除 scroll_size如果存在然后输入 aggregations{ buckets: { date_histogram: { // Step 2: bucket wrapper, interval bucket_span field: timestamp, fixed_interval: 1h }, aggregations: { timestamp: { // Step 3: max timestamp anchor max: { field: timestamp } }, geo.dest: { // Step 4: partition field, name must match exactly terms: { field: geo.dest, size: 1000 }, aggregations: { bytes: { // Step 5: metric field → avg aggregation avg: { field: bytes } } } } } } }关于这个配置有几点需要注意步骤 2date_histogram 使用 fixed_interval: 1h必须与 bucket_span 完全一致。不一致会导致 bucket 时间错误。步骤 3timestamp 上的 max 聚合必须命名为 timestamp并放在 histogram 的 aggregations 内部否则 ML 节点无法确定每个 bucket 的精确结束时间。步骤 4partition 字段的 terms 聚合必须严格使用字段名本身作为名称geo.dest而不能用 geo.dest_grouping 或任何别名。ML 引擎使用的是 aggregation 的名称而不是 field 参数来识别分区值。如果不匹配会导致分区字段在结果中被静默丢失。步骤 5metric 聚合 key bytes 必须与 detector 中的 field_name 完全一致。任何不匹配都会导致异常评分出现静默错误。验证方式在创建作业之前使用 Preview 标签页。该步骤会在真实数据上执行 aggregation并展示 ML 节点实际接收到的数据是提交前非常重要的校验手段。添加影响因子字段influencer fields在上面的示例中geo.dest 是分区字段。ML 模型会为每个目标国家学习单独的基线并按国家分别报告异常。但你可能还希望 machine.os 在异常结果中作为influencer出现当 detector 触发时你希望看到“对于 geo.dest: CN 和 machine.os: win 来说这看起来是异常的这些因素共同促成了结果”。influencers 不参与异常检测本身它们只是为已发现的异常提供上下文。为了在分区字段之外支持 influenceranalysis 配置中需要添加 influencers 数组{ bucket_span: 1h, summary_count_field_name: doc_count, detectors: [ { function: high_mean, field_name: bytes, partition_field_name: geo.dest } ], influencers: [geo.dest, machine.os] }现在 datafeed 需要同时对这两个字段进行聚合。一个 terms 嵌套在另一个 terms 之内是行不通的因为内层 terms 只会在每个外层 bucket 中返回该字段的 top-N 值这会导致某些组合被静默丢失。因此应当使用 composite aggregation在每个字段上各使用一个 terms source并将其嵌套在 date_histogram 内部{ buckets: { date_histogram: { field: timestamp, fixed_interval: 1h }, aggregations: { timestamp: { max: { field: timestamp } }, group_by_fields: { composite: { size: 1000, sources: [ { geo.dest: { terms: { field: geo.dest } } }, { machine.os: { terms: { field: machine.os } } } ] }, aggregations: { bytes: { avg: { field: bytes } } } } } } }composite 会为 (geo.dest, machine.os) 的每一个唯一组合生成一个 bucket。ML 节点会看到所有这些组合并能够正确判断在某个国家响应大小激增时哪个操作系统在其中起到了贡献作用。可以通过 preview 来确认是否出现了所有不同的 pair。如果你只看到很少的行而预期应该有很多则可能需要提高 composite 的 size 参数。需要注意的是这个 composite 是嵌套在 date_histogram 内部的这与下面 rare、freq_rare 和 info_content 使用的顶层 composite 结构不同。这种区别非常重要嵌套在 date_histogram 内的 composite 会使用标准 extractor而顶层 composite 会使用 composite extractor它会跨时间分页遍历所有 value 组合。分类categorizationcategorization 可以与 aggregated datafeeds 一起使用summary_count_field_name 和 categorization_field_name 可以在同一个作业中共存。五步模式可以直接应用。步骤 2 使用标准的 date_histogram 拓扑结构。步骤 4 有一个调整不再使用 partition 字段而是对文本字段本身使用 terms 聚合并在其 .keyword 子字段上执行名称必须与 categorization_field_name 完全一致。步骤 5 可以跳过因为 count detector 只依赖 doc_count。analysis 配置{ bucket_span: 1h, summary_count_field_name: doc_count, categorization_field_name: message, detectors: [ { function: count, by_field_name: mlcategory } ], influencers: [mlcategory] }datafeed aggregations{ buckets: { date_histogram: { field: timestamp, fixed_interval: 1h }, aggregations: { timestamp: { max: { field: timestamp } }, message: { terms: { field: message.keyword, size: 1000 } } } } }datafeed 会为每一个唯一的 message.keyword 值发送一个 bucket并为每个 bucket 提供 doc_count。ML 节点接收这些字符串对其进行 categorization并为每个值分配一个 mlcategory而 count detector 则跟踪每个 bucket 中每个类别的文档数量。步骤 4 中的命名规则同样适用terms 聚合必须命名为 message并且要与 analysis 配置中的 categorization_field_name 完全一致。需要注意的一点是keyword 字段默认有 ignore_above: 256 的限制。超过 256 个字符的日志消息不会被索引为 .keyword因此会被静默排除在聚合之外。如果日志消息较长在使用该方法前需要检查字段 mapping。可能需要在 index template 中提高该限制。最简模式time_of_day 和 time_of_week 是最容易聚合的函数它们只需要时间戳和文档计数。C 进程从 bucket timestamp 中提取时间成分并构建正常活动的周期模型doc_count 用于表示每个 bucket 中的事件数量。不需要 terms source不需要 metric 聚合也不需要 composite。analysis 配置{ bucket_span: 15m, summary_count_field_name: doc_count, detectors: [ { function: time_of_day } ] }Datafeed aggregations:{ time: { date_histogram: { field: timestamp, fixed_interval: 15m }, aggregations: { timestamp: { max: { field: timestamp } } } } }plain date_histogram 就足够了不需要 composite。这使得 time_of_day 和 time_of_week 特别适合 CCS 场景每个时间块一个请求网络传输数据量极小。time_of_week 也使用相同结构唯一变化只是函数名称。如果你想添加 partition_field_name例如按 service 建模一天中的时间模式可以在 histogram 的 aggregations 中按照标准步骤 4 的模式加入 terms 聚合。composite 模式针对 rare、freq_rare 和 info_content 的 composite 模式都需要 composite extractor即遍历所有唯一值组合而不是截断为 top-N 的那个组件。五步模式在这里适用但步骤 2 的拓扑结构不同composite 位于顶层而不是 date_histogramdate_histogram 作为其内部 source。步骤 3 将 max timestamp 聚合放在与 composite 同级的位置步骤 5 可以跳过因为这三种函数都只依赖 doc_count。这三种函数的数据结构是统一的顶层是 composite内部包含一个 date_histogram source并且每个分析字段各有一个 terms source。唯一变化在于 terms source 的字段选择rare 需要一个 by_field_name 的 sourcefreq_rare 需要 by_field_name 和 over_field_name 两个 sourcesinfo_content 需要 field_name以及可选的 by_field_name 或 over_field_name sources这三者都不需要 metric aggregation。{ buckets: { composite: { size: 10000, sources: [ { timestamp: { date_histogram: { field: timestamp, fixed_interval: 5m } } }, { by_field: { terms: { field: by_field } } }, { over_field: { terms: { field: over_field } } } ] }, aggregations: { timestamp: { max: { field: timestamp } } } } }几点说明composite 聚合必须是顶层聚合不能嵌套在 date_histogram 内。这正是将 datafeed 路由到 composite extractor 的关键。date_histogram 是 composite 内部的一个 source而不是外层包装。其 fixed_interval 必须能被 bucket_span 整除。timestamp 上的 max 聚合与 composite 同级在 aggregations 内部不能嵌套在 composite 里面。composite.size 控制每次往返的分页大小。设置较高值如 10000可以减少往返次数这在 CCS 延迟场景中很重要。当存在多个 source 和高基数字段时总组合数可能非常大extractor 会自动分页处理。为什么基于 aggregation 的 datafeed 在大规模下更快性能差距是结构性的而不是偶然的。scroll-based datafeed 是逐页读取原始文档每 1,000 条文档对应一次请求并且必须等待前一个请求完成后才能发起下一个。因此请求数量与回填时间范围内的文档总量成正比。在每小时 836,000 条事件、持续 13 个月的情况下总量约为 79 亿条事件也就是约 790 万次顺序往返。每一次往返都要跨越 CCS 边界、等待 shard 响应并完整传输匹配文档。整个过程没有并行性datafeed 在远程集群上保持 scroll context并逐页处理数据。而 aggregation-based datafeed 的工作方式完全不同。数据节点在本地对数据进行汇总按时间 bucket 和分类字段分组只将 bucket 结果发送给 ML 节点。请求数量与字段基数相关而不是与文档数量相关。在我们的例子中两个 influencer 字段产生 6 种唯一组合每个时间 bucket 只生成 6 行结果datafeed 只需对这些结果进行分页而不管每个 bucket 中有多少原始事件。摄入速率翻倍scroll 请求数翻倍但 aggregation 请求数保持不变。这就是为什么规模越大差距越明显数据越多scroll 越差aggregation 越有优势。在实时数据场景中情况不同因为每个实时 tick 只处理一个新的 bucketscroll 会按该 bucket 的数据量发起多个分页请求而 aggregation 只需要一次请求。在 836,000 events/hour、15 分钟 bucket span 的条件下20× 的差异正是这个比例的体现。当摄入速率 × bucket span大于 scroll_size 时scroll-based datafeed 就无法跟上实时数据无论硬件如何扩展。低于这个阈值时scroll 仍然可用而 aggregation 只是优化超过该阈值后aggregation 成为唯一可持续方案。scroll-based datafeed 是合理的默认值Kibana wizards 在大多数部署中做出了正确选择。但在大规模场景更多 shard、更宽索引模式、跨层 CCS下切换到 aggregation-based datafeed 是自然演进数据节点在数据所在地做汇总ML 节点处理压缩后的结果检测逻辑保持不变。唯一需要提前注意的成本是模型状态切换需要新建作业因此越早迁移损失越小。如果遇到本文未覆盖的情况例如某些 aggregation 无法映射或 composite 行为异常可以到 Elastic Discuss 社区继续讨论。原文https://www.elastic.co/search-labs/blog/elastic-machine-leaning-jobs-aggregation-datafeeds