Apache Airflow 系列教程 | 第22课:Kubernetes 集成深度实践
导读(Introduction)欢迎来到 Apache Airflow 源码深度解析系列的第二十二课。在前一课中,我们学习了如何通过云服务 Provider 集成 AWS、GCP、Azure 等外部服务。本课将聚焦于 Airflow 与 Kubernetes 的深度集成——这是现代数据平台中最重要的基础设施融合之一。Kubernetes 为 Airflow 带来了真正的弹性伸缩能力。传统的 CeleryExecutor 需要预先分配固定数量的 Worker 节点,而 KubernetesExecutor 则采用"Pod 即 Worker"的设计哲学——每个任务在独立的 Pod 中执行,用完即销毁。这意味着你可以实现零闲置资源、完全隔离的任务执行,以及按需伸缩的集群容量。本课将深入providers/cncf/kubernetes/源码,从 KubernetesExecutor 的调度循环、KubernetesPodOperator 的生命周期管理、到 Pod 模板的三层合并机制,全面揭示 Airflow 如何在 Kubernetes 之上构建弹性工作流执行引擎。学习目标(Learning Objectives)完成本课学习后,你将能够:理解 KubernetesExecutor 的工作原理——掌握"Pod 即 Worker"的核心设计,以及任务从入队到 Pod 创建再到结果回收的完整生命周期深入 KubernetesPodOperator 的实现——理解在独立 Pod 中执行任务的同步/异步模式、Pod 重连、XCom 传递机制掌握 Pod 模板与三层合并机制——理解 pod_template_file、动态 Pod、executor_config 三者的优先级与合并逻辑分析 KubernetesHook 的连接管理——理解 in_cluster、kubeconfig 等多种认证方式的优先级链理解 Pod 生命周期监控——KubernetesJobWatcher 的 Watch 机制、资源版本跟踪、失败检测掌握 Kubernetes Secret 处理——环境变量注入与 Volume 挂载两种密钥分发方式了解生产环境最佳实践——资源限制、命名空间隔离、日志收集、Pod 清理策略正文内容(Main Content)1. KubernetesExecutor:Pod 即 Worker 的设计哲学1.1 传统 Executor 的局限在理解 KubernetesExecutor 之前,让我们回顾传统执行器的局限性:Executor 类型局限性LocalExecutor单机运行,无法水平扩展CeleryExecutor需要预部署 Worker 节点,资源利用率低SequentialExecutor串行执行,仅适合测试KubernetesExecutor的核心理念是将 Kubernetes 集群本身作为弹性计算资源池:按需创建:每个 Task Instance 独立在一个 Pod 中执行完全隔离:不同任务之间没有进程级别的相互影响弹性伸缩:Pod 数量随任务负载自动伸缩,无需预分配 Worker异构资源:不同任务可请求不同的 CPU/内存/GPU 资源1.2 整体架构概览KubernetesExecutor 的架构包含以下核心组件:┌───────────────────────────────────────────────────────┐ │ Scheduler │ │ ┌──────────────────────────────────────────────────┐ │ │ │ KubernetesExecutor │ │ │ │ ┌────────────┐ ┌────────────┐ ┌───────────┐ │ │ │ │ │ task_queue │ │result_queue│ │ running │ │ │ │ │ └─────┬──────┘ └─────▲──────┘ └───────────┘ │ │ │ │ │ │ │ │ │ │ ▼ │ │ │ │ │ ┌──────────────────────┴───────────────────────┐ │ │ │ │ │ AirflowKubernetesScheduler │ │ │ │ │ │ ┌──────────────┐ ┌────────────────────┐ │ │ │ │ │ │ │ run_next() │ │ KubernetesJobWatcher│ │ │ │ │ │ │ │ (Pod创建) │ │ (Pod事件监听) │ │ │ │ │ │ │ └──────┬───────┘ └──────────▲─────────┘ │ │ │ │ │ └─────────┼───────────────────────┼────────────┘ │ │ │ └────────────┼───────────────────────┼──────────────┘ │ └───────────────┼───────────────────────┼────────────────┘ │ │ ▼ │ ┌───────────────────────────────────────────────────────┐ │ Kubernetes API Server │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │Worker Pod│ │Worker Pod│ │Worker Pod│ │Worker Pod│ │ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ └───────────────────────────────────────────────────────┘1.3 源码解析:KubernetesExecutor 类KubernetesExecutor 继承自BaseExecutor,是整个 Kubernetes 集成的核心调度器。让我们看其初始化逻辑:# providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.pyclassKubernetesExecutor(BaseExecutor):"""Executor for Kubernetes."""RUNNING_POD_LOG_LINES=100supports_ad_hoc_ti_run:bool=Truesupports_multi_team:bool=Truedef__init__(self,*args,**kwargs):super().__init__(*args,**kwargs)# 团队感知配置支持(AIP-67 Multi-Team 特性)ifnothasattr(self,"conf")ornothasattr(self.conf,"getint"):self.conf=conf self.kube_config=KubeConfig(executor_conf=self.conf)self.parallelism=self.kube_config.parallelism# 核心队列:基于 multiprocessing.Manager 的进程安全队列self._manager=multiprocessing.Manager()self.task_queue:Queue[KubernetesJob]=self._manager.JoinableQueue()self.result_queue:Queue[KubernetesResults]=self._manager.JoinableQueue()self.kube_scheduler:AirflowKubernetesScheduler|None=Noneself.kube_client:client.CoreV1Api|None=Noneself.task_publish_retries:Counter[TaskInstanceKey]=Counter()self.task_publish_max_retries=self.conf.getint("kubernetes_executor","task_publish_max_retries",fallback=0)关键设计要点:双队列架构:task_queue接收待执行任务,result_queue接收执行结果进程安全:使用multiprocessing.Manager().JoinableQueue()保证多进程安全Multi-Team 支持:supports_multi_team = True表示支持 AIP-67 多团队配置隔离发布重试:task_publish_retries跟踪 Pod 创建失败的重试次数1.4 启动流程:start() 方法defstart(self)-None:"""Start the executor."""self.log.info("Start Kubernetes executor")self.scheduler_job_id=str(self.job_id)fromairflow.providers.cncf.kubernetes.executors.kubernetes_executor_utilsimport(AirflowKubernetesScheduler,)fromairflow.providers.cncf.kubernetes.kube_clientimportget_kube_client self.kube_client=get_kube_client()self.kube_scheduler=AirflowKubernetesScheduler(kube_config=self.kube_config,result_queue=self.result_queue,kube_client=self.kube_client,scheduler_job_id=self.scheduler_job_id,)启动时创建两个关键组件:kube_client:通过get_kube_client()获取 Kubernetes API 客户端kube_scheduler:AirflowKubernetesScheduler负责实际的 Pod 创建和监控1.5 核心调度循环:sync() 方法sync()是 KubernetesExecutor 的心跳方法,由 Scheduler 定期调用:defsync(self)-None:"""Synchronize task state."""# 1. 定期检查是否需要收养已完成的孤儿 Podadoption_interval=conf.getfloat("scheduler","orphaned_tasks_check_interval",fallback=300.0)now=time.monotonic()ifnow-self._last_completed_pod_adoption=adoption_interval:self._last_completed_pod_adoption=now self._adopt_completed_pods(self.kube_client)# 2. 同步 kube_scheduler 状态(处理 Watcher 事件)self.kube_scheduler.sync()# 3. 从 result_queue 中获取执行结果,更新任务状态last_resource_version:dict[str,str]=defaultdict(lambda:"0")withcontextlib.suppress(Empty):whileTrue:results=self.result_queue.get_nowait()try:last_resource_version[results.namespace]=results.resource_version self._change_state(results)exceptExceptionase:self.result_queue.put(results)# 重新入队finally:self.result_queue.task_done()# 4. 批量从 task_queue 创建 Podwithcontextlib.suppress(Empty):for_inrange(self.kube_config.worker_pods_creation_batch_size):task=self.task_queue.get_nowait()try:self.kube_scheduler.run_next(task)exceptApiExceptionase:# 处理配额超限、限流等情况,支持重试ifcan_retry_publish:self.task_queue.put(task)self.task_publish_retries[key]=retries+1else:self.fail(key,e)sync()的工作流程可以概括为四个步骤:孤儿 Pod 收养:处理前一个 Scheduler 遗留的未完成 Pod事件同步:从 KubernetesJobWatcher 获取 Pod 状态变更事件结果处理:从 result_queue 中读取已完成任务的结果Pod 创建:批量从 task_queue 中取出任务并创建对应的 Worker Pod注意限流保护:当收到 HTTP 429 响应时,设置create_pods_after时间戳,暂停 Pod 创建直到Retry-After时间过后。1.6 任务入队:execute_async() 与 queue_workload()当 Scheduler 决定执行某个任务时,调用execute_async()将任务放入队列:defexecute_async(self,key,command,queue=None,executor_config=None):"""Execute task asynchronously."""# 从 executor_config 中提取 Pod 覆盖配置kube_executor_config=PodGenerator.from_obj(executor_config)pod_template_file=executor_config.get("pod_template_file",None)ifexecutor_configelseNone# 标记任务为 QUEUED 状态self.event_buffer[key]=(TaskInstanceState.QUEUED,self.scheduler_job_id)# 放入 task_queue,等待 sync() 方法消费self.task_queue.put(KubernetesJob(key,command,kube_executor_config,pod_template_file))在 Airflow 3.x 中,新增了queue_workload()方法支持 workload 模式:defqueue_workload(self,workload:workloads.All,session:Session|None)-None:ifnotisinstance(workload,workloads.ExecuteTask):raiseRuntimeError(f"{type(self)}cannot handle workloads of type{type(workload)}")ti=workload.ti self.queued_tasks[ti.key]=workload2. AirflowKubernetesScheduler:Pod 创建与监控的核心2.1 职责与初始化AirflowKubernetesScheduler是 KubernetesExecutor 的"内部调度器",负责实际的 Pod 操作:# providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.pyclassAirflowKubernetesScheduler(LoggingMixin):"""Airflow Scheduler for Kubernetes."""def__init__(self,kube_config,result_queue,kube_client,scheduler_job_id):self.kube_config=kube_config self.result_queue=result_queue self.namespace=self.kube_config.kube_namespace self.kube_client=kube_client self._manager=multiprocessing.Manager()self.watcher_queue=self._manager.Queue()self.scheduler_job_id=scheduler_job_id# 创建 KubernetesJobWatcher 进程self.kube_watchers=self._make_kube_watchers()2.2 Pod 创建:run_next() 方法run_next()是将任务转化为 Kubernetes Pod 的核心方法:defrun_next(self,next_job:KubernetesJob)-None:"""Receives the next job to run, builds the pod, and creates it."""key=next_job.key command=next_job.command kube_executor_config=next_job.kube_executor_config pod_template_file=next_job.pod_template_file dag_id,task_id,run_id,try_number,map_index=key# Airflow 3.x: 将 workload 序列化为 Task SDK 命令iflen(command)==1andisinstance(command[0],ExecuteTask):workload=command[0]command=workload_to_command_args(workload)# 获取基础 Pod 模板base_worker_pod=get_base_pod_from_template(pod_template_file,self.kube_config)# 构建完整 Pod 规格(三层合并)pod=PodGenerator.construct_pod(namespace=self.namespace,scheduler_job_id=self.scheduler_job_id,pod_id=create_unique_id(dag_id,task_id),dag_id=dag_id,task_id=task_id,kube_image=self.kube_config.kube_image,try_number=try_number,map_index=map_index,date=None,run_id=run_id,args=list(command),pod_override_object=kube_executor_config,base_worker_pod=base_worker_pod,with_mutation_hook=True,)# 异步创建 Pod(不阻塞)self.run_pod_async(pod,**self.kube_config.kube_client_request_args)workload_to_command_args()函数将 workload 转化为 Task SDK 命令:defworkload_to_command_args(workload:workloads.ExecuteTask)-list[str