【vllm】vLLM v1 Executor — 系统级架构深度分析(五)
vLLM v1 Executor — 系统级架构深度分析分析范围vllm/v1/executor/目录8个Python文件~3.5K行代码。Executor 是 v1 推理系统的执行引擎——连接调度器与 Worker负责模型推理的多 GPU / 多节点编排。Dark Terminal 风格架构图 10 张见diagrams/子目录。一、整体架构概览1.1 设计思路v1 Executor 采用分层策略 广播-收集架构抽象基类统一接口Executor(ABC)定义所有执行器的契约4种实现策略可插拔UniProc / Multiproc / Ray / RayV2按部署场景选择广播-收集模式所有分布式执行器统一 broadcast SchedulerOutput collect ModelRunnerOutput进程隔离Worker 运行在独立子进程通过 SHM/IPC 通信避免 GIL 瓶颈异步执行execute_model()返回FutureWrapper不阻塞调度器核心设计哲学Executor 是桥梁连接 EngineCore调度决策和 WorkerGPU 推理进程隔离安全Worker crash 不影响主进程death_pipe 监控零拷贝广播SHM MessageQueue 实现 SchedulerOutput 的 1-to-N 广播统一接口无论单 GPU 还是多节点EngineCore 只与 Executor 交互1.2 架构模式模式应用策略模式4种 Executor 实现Uni/Multiproc/Ray/RayV2工厂方法Executor.get_class()根据配置选择实现广播-收集SHM broadcast SchedulerOutput collect ModelRunnerOutputFuture 模式FutureWrapper实现异步非阻塞执行模板方法_init_executor()由子类覆盖公共逻辑在基类代理模式RayWorkerWrapper 代理 Worker 为 Ray Actor观察者模式death_pipe monitor_workers 监控进程健康1.3 整体运行流程EngineCore.__init__() ├── Executor.get_class(vllm_config) # 工厂方法选择实现 └── executor._init_executor() # 初始化 Workers EngineCore.step() ├── executor.execute_model(scheduler_output) │ ├── Broadcast: scheduler_output - all workers (SHM) │ ├── Each worker: execute_model() - ModelRunnerOutput │ └── Collect: aggregate ModelRunnerOutput across ranks └── Return FutureWrapper / ModelRunnerOutput executor.check_health() # 定期健康检查 executor.collective_rpc(method) # 广播 RPClora, profile, sleep... executor.shutdown() # 终止所有 Worker二、子模块划分模块1Executor ABCabstract.py384行核心作用所有执行器的抽象基类——定义统一接口、工厂方法、公共逻辑。关键类/方法类/方法类型说明Executor(ABC)类抽象基类所有执行器的父类get_class(vllm_config)静态方法工厂方法根据配置返回具体实现_init_executor()抽象方法子类必须覆盖的初始化逻辑collective_rpc(method, args, kwargs)抽象方法广播 RPC 到所有 Workercheck_health()抽象方法健康检查execute_model(scheduler_output)方法执行模型推理含默认实现sample_tokens(grammar_output)方法仅采样跳过模型前向determine_available_memory()方法查询可用显存initialize_from_config()方法初始化 KV Cache 配置register_failure_callback()方法注册失败回调sleep() / wake_up()方法休眠/唤醒用于弹性伸缩max_concurrent_batches属性最大并发批次数uses_ray类属性是否使用 Ray默认 Falsesupports_pp类属性是否支持流水线并行默认 False工厂方法选择逻辑配置值返回类uniUniProcExecutormpMultiprocExecutorrayRayDistributedExecutorray VLLM_USE_RAY_V2RayExecutorV2external_launcherExecutorWithExternalLauncher自定义类路径resolve_obj_by_qualname()架构图见02-executor-abc.svg模块2MultiprocExecutormultiproc_executor.py1038行核心作用多进程执行器——v1 的默认分布式执行后端每 GPU 一个子进程SHM IPC 通信。关键类/方法类/方法说明MultiprocExecutor多进程执行器主类FutureWrapper(Future)异步结果包装器WorkerProcWorker 子进程管理UnreadyWorkerProcHandle未初始化的 Worker 句柄WorkerProcHandle已初始化的 Worker 句柄ResponseStatus(Enum)响应状态PENDING/READY/ERROR初始化流程(_init_executor())创建rpc_broadcast_mqSHM 广播队列SchedulerOutput 1→N为每个 local_rank 创建WorkerProc.make_worker_process()fork/spawn 子进程传递 SHM handle distributed_init_methodWorkerProc.wait_for_ready()等待所有 Worker 初始化完成启动worker_monitor线程death_pipe 监控建立response_mqs每个 Worker 的响应队列等待所有 MessageQueue 就绪执行流程(execute_model())将scheduler_output广播到rpc_broadcast_mq每个 Worker 子进程接收并执行worker.execute_model()Worker 通过worker_response_mq返回ModelRunnerOutput收集所有 Worker 响应聚合成最终结果返回FutureWrapper非阻塞Worker 子进程内部worker_main()子进程入口注册信号处理器worker_busy_loop()循环接收 input → 执行 → 发送 outputasync_output_busy_loop()异步输出处理循环death_pipe_monitor()监控子进程死亡架构图见03-multiproc-executor.svg模块3WorkerProc IPCmultiproc_executor.py 内部类 MessageQueue核心作用Worker 子进程管理 进程间通信——确保高效、可靠的数据传输。WorkerProc 类层次类说明UnreadyWorkerProcHandle未初始化句柄proc death_writer ready_pipeWorkerProcHandle已初始化句柄worker_response_mq peer_worker_response_mqsWorkerProc子进程管理器make_worker_process() wait_for_ready() shutdown()IPC 机制通道方向协议用途rpc_broadcast_mqDriver → All WorkersSHM pickle广播 SchedulerOutputworker_response_mqWorker → DriverSHM单 Worker 的 ModelRunnerOutputpeer_worker_response_mqsRemote Worker → DriverTCP/ZMQ跨节点响应death_pipeWorker → Monitorpipe子进程死亡通知ready_pipeWorker → Driverpipe初始化完成握手关键方法方法说明WorkerProc.make_worker_process()创建子进程 设置 IPCWorkerProc.wait_for_ready()等待所有 Worker 初始化WorkerProc.shutdown()终止子进程 清理资源WorkerProc._init_message_queues()初始化 SHM 消息队列worker_busy_loop()Worker 主循环recv→exec→sendasync_output_busy_loop()异步输出处理循环death_pipe_monitor()监控子进程健康setup_proc_title_and_log_prefix()设置进程标题架构图见04-worker-proc-ipc.svg模块4UniProcExecutoruniproc_executor.py190行核心作用单进程执行器——最简单的实现Worker 与 EngineCore 同进程运行。关键类/方法类/方法说明UniProcExecutor单进程执行器_init_executor()创建单个 GPUWorker直接 init_device load_modelexecute_model()直接调用 worker.execute_model()无 IPCsample_tokens()直接调用 worker.sample_tokens()collective_rpc()直接调用 Worker 方法check_health()始终返回 OK无子进程shutdown()worker.shutdown()max_concurrent_batches固定返回 1supports_async_scheduling()返回 FalseExecutorWithExternalLauncher属性说明继承 UniProcExecutor用途外部集群管理器Slurm 等_distributed_args()从环境变量获取 RANK/WORLD_SIZEdetermine_available_memory()从 Worker 查询架构图见05-uniproc-executor.svg模块5RayDistributedExecutorray_executor.py645行核心作用Ray 分布式执行器——基于 Ray Actor 的分布式编排支持 Ray DAG 编译加速。关键类/方法类/方法说明RayDistributedExecutorRay 分布式执行器RayWorkerMetaDataRay Worker 元数据actor handle IP GPU IDs_init_executor()创建 Ray actors placement groups_init_workers_ray()初始化 Ray workers排序、资源分配execute_model()执行模型DAG 或 collective_rpc_execute_dag()Ray 编译 DAG 执行路径_compiled_ray_dag()编译 Ray DAGcollective_rpc()广播到所有 Ray actorsreinitialize_distributed()动态数据并行重配置check_health()Ray actor 存活检查shutdown()终止 Ray actorsRay 编译 DAG_check_ray_cgraph_installation()验证 Ray DAG 编译环境_compiled_ray_dag(enable_asyncio)编译计算图减少 Ray 调度开销_execute_dag()通过编译 DAG 执行比逐次 RPC 更高效架构图见06-ray-executor.svg模块6RayExecutorV2ray_executor_v2.py524行核心作用Ray 多进程混合执行器——用 Ray 管理进程用 SHM IPC 传输数据兼得两者优势。关键类/方法类/方法说明RayExecutorV2(MultiprocExecutor)继承 MultiprocExecutor复用 IPC 机制RayWorkerProc(WorkerProc)Ray Actor 包装的 Worker 进程RayWorkerHandleRay Actor 句柄_init_executor()创建 Ray actors SHM MessageQueues_build_runtime_env()构建 Ray 运行环境start_worker_monitor()Ray Actor 健康监控shutdown()清理 Ray actors MP 资源与 RayDistributedExecutor 的对比维度RayDistributedExecutorRayExecutorV2数据传输Ray RPC序列化网络SHM MessageQueue零拷贝进程管理Ray ActorRay Actor延迟较高Ray 调度开销较低SHM 直传复杂度纯 RayRay MP 混合适用场景简单分布式高吞吐生产环境架构图见07-ray-executor-v2.svg模块7Ray 工具库ray_utils.py696行 ray_env_utils.py18行核心作用Ray 基础设施——Worker 包装、Placement Group 管理、集群初始化。RayWorkerWrapper在 ray_utils.py 中定义方法说明execute_method()通用远程方法调用execute_model_ray()专用模型执行含 PP 中间张量处理get_node_ip()获取 Ray 节点 IPget_node_and_gpu_ids()获取 GPU 资源映射setup_device_if_necessary()延迟 GPU 初始化adjust_rank()DP rank 重映射override_env_vars()Ray 环境变量传播detach_zero_copy_from_model_runner_output()零拷贝分离Placement Group 工具函数说明initialize_ray_cluster()完整 Ray 集群初始化_verify_bundles()验证资源 bundlebuild_actor_name()Actor 命名规范get_bundles_for_indices()GPU bundle 映射get_bundles_sorted_by_node()按节点排序 bundle_wait_until_pg_ready()等待 PG 创建_wait_until_pg_removed()等待 PG 清理get_num_tpu_nodes()TPU 节点数get_num_nodes_in_placement_group()PG 节点数辅助函数函数说明ray_is_available()检查 Ray 是否可用assert_ray_available()断言 Ray 可用FutureWrapper(Future)Ray ObjectRef 包装get_driver_env_vars()获取 Driver 环境变量架构图见08-ray-utils.svg三、模块调用关系与数据流3.1 主要调用链EngineCore ↓ create Executor.get_class(vllm_config) - 选择具体实现 ↓ Executor._init_executor() - 初始化 Workers ├── UniProc: Worker(local_rank0) 直接创建 ├── Multiproc: WorkerProc.make_worker_process() × N ├── Ray: RayWorkerWrapper.as_actor() × N └── RayV2: RayWorkerProc() × N SHM queues EngineCore.step() ↓ execute_model Executor.execute_model(scheduler_output) ├── Multiproc: broadcast(SchedulerOutput) - Workers execute - collect ├── Ray: collective_rpc / _execute_dag ├── RayV2: broadcast(SHM) - Ray actors execute - collect(SHM) └── UniProc: worker.execute_model() 直接调用 ↓ ModelRunnerOutput ↓ EngineCore continues scheduling3.2 数据流详图SchedulerOutput (from EngineCore) ↓ Executor.execute_model(scheduler_output) ├── rpc_broadcast_mq.enqueue(scheduler_output) [Multiproc/RayV2] │ ↓ SHM 零拷贝广播 ├── WorkerProc.worker_busy_loop() │ ├── recv from input queue │ ├── Worker.execute_model(scheduler_output) │ │ └── GPUModelRunner.execute_model() │ │ └── CUDA forward sampling │ └── send via worker_response_mq ├── Collect responses from all ranks │ ├── response_mqs[i].dequeue() │ └── Aggregate ModelRunnerOutput (TP all-reduce) └── Set FutureWrapper result ↓ ModelRunnerOutput (returned to EngineCore)3.3 关键交互调用方被调用方数据方式EngineCoreExecutorSchedulerOutput方法调用MultiprocExecutorrpc_broadcast_mqSchedulerOutputSHM enqueueWorkerProcWorkerSchedulerOutputSHM dequeue → 方法调用WorkerGPUModelRunnerSchedulerOutput方法调用Workerworker_response_mqModelRunnerOutputSHM enqueueMultiprocExecutorresponse_mqsModelRunnerOutputSHM dequeue 聚合UniProcExecutorWorker所有数据直接方法调用RayDistributedExecutorRayWorkerWrapper所有数据Ray RPCRayExecutorV2RayWorkerProcSchedulerOutputSHM enqueueExecutorWorkerlora/profile/sleepcollective_rpc四、设计模式总结模式应用位置说明策略模式4种 Executor 实现按部署场景选择执行策略工厂方法Executor.get_class()根据配置动态选择实现类广播-收集Multiproc/Ray 执行统一的 scatter-gather 模式Future 模式FutureWrapper异步非阻塞模型执行模板方法_init_executor()基类定流程子类定细节代理模式RayWorkerWrapper将 Worker 包装为 Ray Actor观察者模式death_pipe monitor_workers监控 Worker 健康状态引用计数Shared Worker Lock多 Worker 共享锁进程池WorkerProc 池N 个 Worker 子进程并行执行五、关键指标指标数值Executor 总代码量~3.5K 行8 文件MultiprocExecutor1038 行最大文件ray_utils.py696 行RayDistributedExecutor645 行RayExecutorV2524 行abstract.py384 行UniProcExecutor190 行ray_env_utils.py18 行Executor 实现类型5 种Uni/MP/Ray/RayV2/ExtLauncherIPC 机制SHM MessageQueue Ray RPC pipe并行支持TP PP DP PCP六、架构亮点与设计权衡亮点统一抽象接口Executor(ABC)让 EngineCore 无需关心底层是单 GPU 还是多节点SHM 零拷贝广播rpc_broadcast_mq实现 SchedulerOutput 的 1→N 高效广播进程隔离安全Worker 运行在独立子进程crash 不影响 Driverdeath_pipe 监控异步 Future 模式execute_model()返回FutureWrapper不阻塞调度器RayV2 混合架构Ray 管理进程 SHM 传数据兼得 Ray 弹性调度和 MP 低延迟工厂方法动态选择get_class()按配置自动选择最优执行器弹性伸缩支持sleep()/wake_up() DP reinitialize 动态调整资源权衡Multiproc 最复杂1038 行包含子进程管理、IPC、death 监控、信号处理——职责较多Ray 两套实现RayDistributedExecutor 和 RayExecutorV2 并存增加维护成本UniProc 不支持异步调度supports_async_scheduling() False单 GPU 场景无法利用调度优化MessageQueue 阻塞风险如果 Worker hangbroadcast 可能无限阻塞有 timeout 但需谨慎Pipeline Parallel 支持有限仅 RayDistributedExecutor 通过_execute_dag()支持 PPRay DAG 编译依赖需要额外安装 ray-cgraph非标准 Ray 功能报告生成时间2026-04-19 | 代码版本vllm main branch