用Python+遗传算法搞定校园单车调度:手把手复现数学建模B题完整流程(附代码)
用Python遗传算法搞定校园单车调度手把手复现数学建模B题完整流程附代码校园共享单车的调度问题一直是运营效率提升的关键痛点。想象一下早八点的教学楼前单车堆积如山而食堂区域却一辆难求——这种供需失衡不仅影响用户体验更直接拉高运营成本。本文将带你用Python从零实现一套基于遗传算法的智能调度系统完整复现数学建模竞赛中的经典题型。1. 环境准备与数据建模工欲善其事必先利其器。我们需要先搭建好开发环境并构建符合实际场景的数据模型。这里推荐使用Anaconda创建独立Python环境conda create -n bike_scheduler python3.9 conda activate bike_scheduler pip install pandas numpy matplotlib geopy ortools deap1.1 构建校园地理模型首先需要将校园地图数字化。假设校园主要区域包含以下坐标点单位米点位编号名称X坐标Y坐标需求系数P1教学楼A1203500.8P2图书馆2802000.9P3食堂1501000.7P4宿舍区503000.6P5体育馆3004000.5用Python表示这个地理模型import numpy as np locations { P1: {name: 教学楼A, coords: (120, 350), demand: 0.8}, P2: {name: 图书馆, coords: (280, 200), demand: 0.9}, P3: {name: 食堂, coords: (150, 100), demand: 0.7}, P4: {name: 宿舍区, coords: (50, 300), demand: 0.6}, P5: {name: 体育馆, coords: (300, 400), demand: 0.5} }1.2 初始化单车分布模拟早晨7点的单车初始分布状态initial_distribution { P1: 15, # 过剩状态 P2: 5, # 适中 P3: 3, # 短缺 P4: 20, # 严重过剩 P5: 8 # 适中 }注意需求系数越高表示该点位在高峰时段单车需求越大合理的调度应该使单车数量与需求系数正相关。2. 遗传算法核心设计遗传算法特别适合解决这种带约束条件的组合优化问题。我们需要设计染色体编码、适应度函数以及遗传操作。2.1 染色体编码方案每个染色体代表一种调度方案包含三个部分调度路线序列各点位的装卸数量使用的调度车数量≤3# 示例染色体结构 sample_chromosome { routes: [[P4, P3, P1], [P5, P2]], # 两辆车的路线 operations: { P4: -8, # 运出8辆 P3: 5, # 运入5辆 P1: 3, # 运入3辆 P5: -2, # 运出2辆 P2: 2 # 运入2辆 } }2.2 适应度函数设计适应度函数需要平衡多个优化目标def calculate_fitness(chromosome): # 1. 计算供需匹配度 supply_score 0 for point, op in chromosome[operations].items(): final_count initial_distribution[point] op ideal_count int(locations[point][demand] * 30) # 假设基准为30辆 supply_score - abs(final_count - ideal_count) # 2. 计算路径效率 distance_score 0 for route in chromosome[routes]: route_distance calculate_route_distance(route) distance_score - route_distance # 3. 惩罚违反约束的方案 penalty 0 if len(chromosome[routes]) 3: penalty - 1000 # 超过3辆车的惩罚 return supply_score distance_score*0.5 penalty提示实际实现时还需要考虑单车的装载量约束每车≤20辆和速度约束25km/h。3. 完整算法实现现在我们将各个组件整合成完整的遗传算法流程。3.1 种群初始化from deap import base, creator, tools import random creator.create(FitnessMax, base.Fitness, weights(1.0,)) creator.create(Individual, dict, fitnesscreator.FitnessMax) toolbox base.Toolbox() def generate_individual(): # 简化的个体生成逻辑 routes random.sample(list(locations.keys()), len(locations)) operations {p: random.randint(-5,5) for p in locations} return {routes: [routes], operations: operations} toolbox.register(individual, tools.initIterate, creator.Individual, generate_individual) toolbox.register(population, tools.initRepeat, list, toolbox.individual)3.2 遗传操作设计def mutate_individual(individual): # 路线变异 if random.random() 0.3: route random.choice(individual[routes]) random.shuffle(route) # 操作量变异 for point in individual[operations]: if random.random() 0.2: individual[operations][point] random.randint(-2,2) return individual toolbox.register(mate, tools.cxTwoPoint) toolbox.register(mutate, mutate_individual) toolbox.register(select, tools.selTournament, tournsize3) toolbox.register(evaluate, calculate_fitness)3.3 主进化循环def run_evolution(pop_size50, generations100): pop toolbox.population(npop_size) for gen in range(generations): # 评估适应度 fitnesses list(map(toolbox.evaluate, pop)) for ind, fit in zip(pop, fitnesses): ind.fitness.values (fit,) # 选择下一代 offspring toolbox.select(pop, len(pop)) offspring list(map(toolbox.clone, offspring)) # 交叉操作 for child1, child2 in zip(offspring[::2], offspring[1::2]): if random.random() 0.7: toolbox.mate(child1, child2) del child1.fitness.values del child2.fitness.values # 变异操作 for mutant in offspring: if random.random() 0.2: toolbox.mutate(mutant) del mutant.fitness.values # 更新种群 pop[:] offspring return pop4. 结果分析与可视化获得最优解后我们需要评估调度方案的实际效果。4.1 调度方案评估def evaluate_solution(solution): print( 调度方案评估 ) print(f使用调度车数量: {len(solution[routes])}) total_moved sum(abs(v) for v in solution[operations].values()) print(f总调度单车数: {total_moved}) # 计算各点位最终数量 for point in locations: final initial_distribution[point] solution[operations].get(point, 0) ideal int(locations[point][demand] * 30) print(f{locations[point][name]}: {initial_distribution[point]} → {final} (理想:{ideal}))4.2 路径可视化使用matplotlib绘制调度路线import matplotlib.pyplot as plt def plot_solution(solution): plt.figure(figsize(10,8)) # 绘制点位 for pid, data in locations.items(): x, y data[coords] plt.scatter(x, y, s200, labeldata[name]) plt.text(x5, y5, pid, fontsize12) # 绘制路线 colors [red, blue, green] for i, route in enumerate(solution[routes]): if len(route) 2: continue x_coords [locations[p][coords][0] for p in route] y_coords [locations[p][coords][1] for p in route] plt.plot(x_coords, y_coords, colorcolors[i], linestyle--, markero, labelf调度车{i1}) plt.title(校园单车调度路线图) plt.xlabel(X坐标 (米)) plt.ylabel(Y坐标 (米)) plt.grid(True) plt.legend() plt.show()5. 进阶优化与扩展基础版本实现后我们可以从以下几个方向进一步提升算法性能5.1 多目标优化引入NSGA-II算法同时优化供需匹配度调度路径总长度使用的调度车数量from deap import algorithms creator.create(FitnessMulti, base.Fitness, weights(1.0, 1.0, -1.0)) creator.create(Individual, dict, fitnesscreator.FitnessMulti) def multi_objective_evaluate(individual): supply_score calculate_supply_score(individual) distance_score calculate_distance_score(individual) truck_count len(individual[routes]) return supply_score, distance_score, truck_count5.2 实时调度系统结合历史数据预测各点位需求变化from sklearn.ensemble import RandomForestRegressor class DemandPredictor: def __init__(self): self.model RandomForestRegressor() def train(self, historical_data): # historical_data格式: [{hour:8, weekday:1, weather:1, demand:0.8}, ...] X np.array([[d[hour], d[weekday], d[weather]] for d in historical_data]) y np.array([d[demand] for d in historical_data]) self.model.fit(X, y) def predict(self, current_conditions): return self.model.predict([current_conditions])[0]5.3 故障车辆回收模型扩展算法处理故障车辆回收问题def add_fault_recovery(solution, fault_points): # 为现有方案添加故障点回收路线 for point in fault_points: # 找到最近的调度路线插入点 nearest_route find_nearest_route(solution[routes], point) insert_position find_best_insertion(nearest_route, point) nearest_route.insert(insert_position, point) solution[operations][point] -initial_distribution[point] # 全部运走 return solution6. 工程实践建议在实际部署这类系统时有几个关键点需要注意数据采集质量确保单车分布数据的实时性和准确性建议每5分钟更新一次各点位单车数量使用蓝牙信标或摄像头辅助计数建立数据异常检测机制算法参数调优不同校园环境需要调整的参数包括遗传算法的种群大小和迭代次数适应度函数中各指标的权重变异和交叉的概率系统集成方案class BikeScheduler: def __init__(self, map_data): self.map load_map_data(map_data) self.predictor DemandPredictor() self.ga_engine GAEngine() def run_daily_schedule(self): current_status get_current_distribution() weather get_weather_condition() hour datetime.now().hour predicted_demand self.predictor.predict({ hour: hour, weekday: datetime.now().weekday(), weather: weather }) optimal_plan self.ga_engine.optimize( current_status, predicted_demand ) return optimal_plan硬件部署考量调度车安装GPS和车载计数器使用轻量级通信协议传输调度指令开发司机端APP展示最优路线