机器学习流水线:从数据到部署
机器学习流水线从数据到部署1. 技术分析1.1 机器学习流水线架构完整的机器学习流水线包含多个阶段ML Pipeline 数据采集 → 数据清洗 → 特征工程 → 模型训练 → 模型评估 → 部署 → 监控1.2 流水线组件对比组件功能常用工具数据采集获取数据Kafka、Flume数据清洗预处理Pandas、PySpark特征工程特征提取Scikit-learn、Feast模型训练训练模型PyTorch、TensorFlow模型评估评估指标Scikit-learn、MLflow模型部署上线服务Flask、TorchServe模型监控性能监控Prometheus、Evidently1.3 流水线工具对比工具定位特点Kubeflow全流程云原生Airflow编排任务调度MLflow实验管理追踪记录Prefect工作流Python原生2. 核心功能实现2.1 数据预处理流水线import pandas as pd import numpy as np from sklearn.preprocessing import StandardScaler, LabelEncoder class DataPipeline: def __init__(self): self.transformers [] def add_transformer(self, transformer): self.transformers.append(transformer) def fit(self, data): for transformer in self.transformers: if hasattr(transformer, fit): data transformer.fit_transform(data) else: data transformer(data) return data def transform(self, data): for transformer in self.transformers: if hasattr(transformer, transform): data transformer.transform(data) else: data transformer(data) return data class DataCleaner: def __init__(self): pass def __call__(self, df): df df.dropna() df df.drop_duplicates() for col in df.columns: if df[col].dtype object: df[col] df[col].str.strip() return df class FeatureEngineer: def __init__(self): self.scaler StandardScaler() self.encoder LabelEncoder() def fit_transform(self, df): numeric_cols df.select_dtypes(include[np.number]).columns categorical_cols df.select_dtypes(include[object]).columns if len(numeric_cols) 0: df[numeric_cols] self.scaler.fit_transform(df[numeric_cols]) if len(categorical_cols) 0: for col in categorical_cols: df[col] self.encoder.fit_transform(df[col]) return df def transform(self, df): numeric_cols df.select_dtypes(include[np.number]).columns categorical_cols df.select_dtypes(include[object]).columns if len(numeric_cols) 0: df[numeric_cols] self.scaler.transform(df[numeric_cols]) if len(categorical_cols) 0: for col in categorical_cols: df[col] self.encoder.transform(df[col]) return df2.2 模型训练流水线import torch import torch.nn as nn from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score class ModelTrainingPipeline: def __init__(self, model, optimizer, loss_fn): self.model model self.optimizer optimizer self.loss_fn loss_fn def train(self, X, y, epochs100, batch_size32): X_train, X_val, y_train, y_val train_test_split(X, y, test_size0.2) X_train torch.tensor(X_train.values, dtypetorch.float32) y_train torch.tensor(y_train.values, dtypetorch.long) X_val torch.tensor(X_val.values, dtypetorch.float32) y_val torch.tensor(y_val.values, dtypetorch.long) for epoch in range(epochs): self.model.train() permutation torch.randperm(X_train.size()[0]) for i in range(0, X_train.size()[0], batch_size): indices permutation[i:ibatch_size] batch_x, batch_y X_train[indices], y_train[indices] self.optimizer.zero_grad() outputs self.model(batch_x) loss self.loss_fn(outputs, batch_y) loss.backward() self.optimizer.step() self.model.eval() with torch.no_grad(): val_outputs self.model(X_val) val_loss self.loss_fn(val_outputs, y_val) predictions torch.argmax(val_outputs, dim1) accuracy accuracy_score(y_val.numpy(), predictions.numpy()) print(fEpoch {epoch1}/{epochs}, Loss: {val_loss.item():.4f}, Accuracy: {accuracy:.4f}) def evaluate(self, X, y): self.model.eval() X torch.tensor(X.values, dtypetorch.float32) y torch.tensor(y.values, dtypetorch.long) with torch.no_grad(): outputs self.model(X) predictions torch.argmax(outputs, dim1) accuracy accuracy_score(y.numpy(), predictions.numpy()) return accuracy class ExperimentTracker: def __init__(self, experiment_name): self.experiment_name experiment_name self.metrics [] def log_metric(self, name, value): self.metrics.append({ experiment: self.experiment_name, metric: name, value: value, timestamp: pd.Timestamp.now() }) def log_params(self, params): self.params params def report(self): print(fExperiment: {self.experiment_name}) print(fParameters: {self.params}) print(Metrics:) for metric in self.metrics: print(f {metric[metric]}: {metric[value]} at {metric[timestamp]})2.3 模型部署流水线import pickle import joblib from flask import Flask, request, jsonify class ModelDeployer: def __init__(self, model, preprocessor): self.model model self.preprocessor preprocessor def save(self, model_path, preprocessor_path): joblib.dump(self.model, model_path) joblib.dump(self.preprocessor, preprocessor_path) classmethod def load(cls, model_path, preprocessor_path): model joblib.load(model_path) preprocessor joblib.load(preprocessor_path) return cls(model, preprocessor) def predict(self, data): data self.preprocessor.transform(data) return self.model.predict(data) class FlaskAPI: def __init__(self, model_deployer): self.app Flask(__name__) self.deployer model_deployer self.app.route(/predict, methods[POST]) def predict(): data request.get_json() df pd.DataFrame(data) predictions self.deployer.predict(df) return jsonify({predictions: predictions.tolist()}) def run(self, host0.0.0.0, port5000): self.app.run(hosthost, portport) class ModelMonitor: def __init__(self): self.prediction_history [] self.performance_history [] def log_prediction(self, data, prediction, actualNone): self.prediction_history.append({ data: data, prediction: prediction, actual: actual, timestamp: pd.Timestamp.now() }) def calculate_drift(self): if len(self.prediction_history) 2: return 0 recent_predictions [p[prediction] for p in self.prediction_history[-100:]] earlier_predictions [p[prediction] for p in self.prediction_history[-200:-100]] recent_dist pd.Series(recent_predictions).value_counts(normalizeTrue) earlier_dist pd.Series(earlier_predictions).value_counts(normalizeTrue) return sum(abs(recent_dist - earlier_dist)) / 2 def report(self): drift self.calculate_drift() print(fData Drift: {drift:.4f}) print(fTotal Predictions: {len(self.prediction_history)})3. 性能对比3.1 流水线工具对比工具易用性扩展性监控能力部署复杂度Airflow中高中高Prefect高中高中Kubeflow低很高很高很高MLflow高中中低3.2 预处理框架对比框架处理速度内存效率功能丰富度Pandas快中高PySpark很快高中Dask快高中3.3 部署方式对比方式延迟(ms)吞吐量可扩展性Flask50中中TorchServe10高高TensorRT5很高很高4. 最佳实践4.1 流水线设计def build_pipeline(config): pipeline DataPipeline() if config.get(cleaning, True): pipeline.add_transformer(DataCleaner()) if config.get(feature_engineering, True): pipeline.add_transformer(FeatureEngineer()) return pipeline class PipelineFactory: staticmethod def create(config): if config[type] classification: return ClassificationPipeline(config) elif config[type] regression: return RegressionPipeline(config)4.2 流水线执行class MLWorkflow: def __init__(self, data_pipeline, training_pipeline, deployer): self.data_pipeline data_pipeline self.training_pipeline training_pipeline self.deployer deployer def run(self, data, labels): print(Step 1: Data Preprocessing) data self.data_pipeline.fit(data) print(Step 2: Model Training) self.training_pipeline.train(data, labels) print(Step 3: Model Evaluation) accuracy self.training_pipeline.evaluate(data, labels) print(fFinal Accuracy: {accuracy:.4f}) print(Step 4: Model Deployment) self.deployer.save(model.joblib, preprocessor.joblib) return accuracy5. 总结机器学习流水线是生产环境的关键数据预处理确保数据质量模型训练构建高质量模型模型部署上线服务模型监控持续跟踪性能对比数据如下Prefect 是最易用的工作流工具PySpark 适合大规模数据处理TorchServe 是模型部署的好选择推荐使用 MLflow 进行实验管理