Kubernetes大数据处理实践
Kubernetes大数据处理实践一、引言大数据处理是现代企业的核心需求之一。Kubernetes为大数据处理提供了弹性、可扩展的平台支持能够高效运行Spark、Flink等大数据框架。二、大数据处理架构2.1 大数据处理参考架构┌─────────────────────────────────────────────────────────────────┐ │ 大数据处理架构 │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │ │ 数据源 │───▶│ 数据存储 │───▶│ 计算引擎 │───▶│ 结果输出 │ │ │ │ (Kafka) │ │ (HDFS/S3) │ │ (Spark) │ │ (DB/DW) │ │ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │ │ │ │ ▼ │ │ ┌──────────┐ │ │ │ 资源管理 │ │ │ │ (YARN/K8s) │ │ │ └──────────┘ │ └─────────────────────────────────────────────────────────────────┘2.2 大数据框架对比框架类型适用场景Apache Spark批处理/流处理通用大数据处理Apache Flink流处理实时流处理Apache Kafka Streams流处理轻量级流处理Apache Hadoop批处理传统大数据批处理三、Spark on Kubernetes部署3.1 Spark Operator部署# 安装Spark Operator kubectl apply -f https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/releases/download/v1.1.0/spark-operator.yaml # 查看Operator状态 kubectl get pods -n spark-operator3.2 Spark Application配置apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi namespace: default spec: type: Scala mode: cluster image: gcr.io/spark-operator/spark:v3.4.1 imagePullPolicy: Always mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar sparkVersion: 3.4.1 restartPolicy: type: OnFailure onFailureRetries: 3 onFailureRetryInterval: 10 onSubmissionFailureRetries: 5 onSubmissionFailureRetryInterval: 20 driver: cores: 1 coreLimit: 1200m memory: 512m labels: version: 3.4.1 serviceAccount: spark executor: cores: 1 instances: 3 memory: 1024m labels: version: 3.4.13.3 Spark ServiceAccount配置apiVersion: v1 kind: ServiceAccount metadata: name: spark namespace: default --- apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: spark-role namespace: default rules: - apiGroups: [] resources: [pods, services, configmaps] verbs: [*] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: spark-role-binding namespace: default roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: spark-role subjects: - kind: ServiceAccount name: spark namespace: default四、Flink on Kubernetes部署4.1 Flink Deployment配置apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: flink-cluster namespace: default spec: image: flink:1.17.1 flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: 4 jobmanager.memory.process.size: 2048m taskmanager.memory.process.size: 4096m serviceAccount: flink jobManager: replicas: 1 resource: memory: 2048m cpu: 1 taskManager: replicas: 2 resource: memory: 4096m cpu: 24.2 Flink Job配置apiVersion: flink.apache.org/v1beta1 kind: FlinkSessionJob metadata: name: my-flink-job namespace: default spec: deploymentName: flink-cluster job: jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar parallelism: 4 upgradeMode: stateless4.3 Flink ServiceAccount配置apiVersion: v1 kind: ServiceAccount metadata: name: flink namespace: default --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: flink rules: - apiGroups: [] resources: [pods, services, configmaps, events] verbs: [*] - apiGroups: [apps] resources: [deployments] verbs: [*] - apiGroups: [flink.apache.org] resources: [flinkdeployments, flinksessionjobs] verbs: [*] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: flink roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: flink subjects: - kind: ServiceAccount name: flink namespace: default五、Kafka on Kubernetes部署5.1 Kafka集群配置apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster namespace: kafka spec: kafka: version: 3.5.1 replicas: 3 listeners: - name: plain port: 9092 type: internal tls: false - name: tls port: 9093 type: internal tls: true config: offsets.topic.replication.factor: 3 transaction.state.log.replication.factor: 3 transaction.state.log.min.isr: 2 storage: type: jbod volumes: - id: 0 type: persistent-claim size: 100Gi deleteClaim: false zookeeper: replicas: 3 storage: type: persistent-claim size: 50Gi deleteClaim: false entityOperator: topicOperator: {} userOperator: {}5.2 Kafka Topic配置apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name:># 安装Airflow helm install airflow apache-airflow/airflow --namespace airflow # 查看Airflow状态 kubectl get pods -n airflow6.2 Airflow DAG配置from datetime import datetime, timedelta from airflow import DAG from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator default_args { owner: airflow, depends_on_past: False, start_date: datetime(2024, 1, 1), email_on_failure: False, email_on_retry: False, retries: 1, retry_delay: timedelta(minutes5), } dag DAG( spark_data_processing, default_argsdefault_args, descriptionSpark data processing pipeline, schedule_intervaltimedelta(days1), ) spark_task SparkSubmitOperator( task_idspark_job, applicationgs://bucket/spark-job.jar, namedata-processing, conf{ spark.executor.instances: 5, spark.executor.memory: 2g, spark.driver.memory: 1g }, dagdag, )七、大数据监控与可观测性7.1 Spark指标收集apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: name: spark-monitor spec: selector: matchLabels: app: spark endpoints: - port: metrics interval: 30s scrapeTimeout: 10s7.2 Prometheus查询# Spark作业执行时间 sum(rate(spark_job_executor_run_time_total[5m])) by (job_id) # Spark任务完成数 sum(spark_job_tasks_completed) by (job_id) # Flink作业延迟 avg(flink_taskmanager_job_task_operator_latency_max) by (job_name)7.3 Grafana仪表盘{ title: Big Data Processing Metrics, panels: [ { type: graph, targets: [ { expr: sum(rate(spark_job_executor_run_time_total[5m])) by (job_id), legendFormat: {{job_id}} } ] }, { type: stat, targets: [ { expr: sum(spark_job_tasks_completed), legendFormat: Completed Tasks } ] } ] }八、总结Kubernetes为大数据处理提供了强大的平台支持能够灵活部署Spark、Flink、Kafka等大数据框架。通过合理的资源配置和监控可以构建高效、可靠的大数据处理平台。