在实际业务中,随着系统的演进,有时需要将已有的 MongoDB 数据迁移到 MySQL。
这种需求常见于以下场景:

  • 历史项目使用 MongoDB 存储非结构化数据,但后续业务需要使用关系型数据库进行统计、分析和事务处理。
  • 新系统统一采用 MySQL 作为主要数据存储,需要整合原有的 MongoDB 数据。
  • 将半结构化数据转为结构化表结构,以便和 BI 系统对接。

本文将以 Java 语言为例,讲解如何从 MongoDB 读取数据,并批量写入 MySQL,实现高效、安全的数据迁移。


一、前置条件

在开始之前,需要准备以下环境:

  1. MongoDB 数据库
  • 确保可以远程访问(开放对应端口,默认 27017)
  • 确认要迁移的集合(Collection)名称和结构
  1. MySQL 数据库
  • 已创建目标数据库和表
  • 根据 MongoDB 字段定义好表结构
  1. Java 开发环境
  • JDK 1.8+(建议)
  • Maven/Gradle 构建工具
  • 相关依赖(MongoDB Java Driver、MySQL Connector/J)
  1. 数据迁移策略
  • 全量迁移:一次性导出所有数据
  • 增量迁移:根据时间戳、主键或业务标识增量导入

二、项目依赖

在 Maven 项目中,pom.xml 添加依赖:

<dependencies><!-- MongoDB Java Driver --><dependency><groupId>org.mongodb</groupId><artifactId>mongo-java-driver</artifactId><version>3.12.14</version></dependency><!-- MySQL Connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency>
</dependencies>

三、MongoDB 数据读取

我们可以通过 MongoDB Java Driver 连接 MongoDB 并读取集合数据:

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import java.util.ArrayList;
import java.util.List;public class MongoReader {public static List<Document> readData(String host, int port, String dbName, String collectionName) {MongoClient mongoClient = new MongoClient(host, port);MongoDatabase database = mongoClient.getDatabase(dbName);MongoCollection<Document> collection = database.getCollection(collectionName);List<Document> result = new ArrayList<>();for (Document doc : collection.find()) {result.add(doc);}mongoClient.close();return result;}
}

这里我们一次性读取所有数据,如果数据量大,应考虑 分页读取 或使用 游标(Cursor) 进行流式处理。


四、MySQL 数据写入

假设目标表结构如下:

CREATE TABLE user_info (id BIGINT PRIMARY KEY,name VARCHAR(100),email VARCHAR(200),create_time DATETIME
);

我们可以用 JDBC 批量插入数据:

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;
import org.bson.Document;public class MysqlWriter {private static final String URL = "jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC";private static final String USER = "root";private static final String PASSWORD = "123456";public static void writeData(List<Document> documents) throws Exception {Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);conn.setAutoCommit(false);String sql = "INSERT INTO user_info (id, name, email, create_time) VALUES (?, ?, ?, ?)";PreparedStatement ps = conn.prepareStatement(sql);for (Document doc : documents) {ps.setLong(1, doc.getLong("id"));ps.setString(2, doc.getString("name"));ps.setString(3, doc.getString("email"));ps.setTimestamp(4, new java.sql.Timestamp(doc.getDate("create_time").getTime()));ps.addBatch();}ps.executeBatch();conn.commit();ps.close();conn.close();}
}

五、整合迁移流程

我们可以编写一个主类,将 MongoDB 读取与 MySQL 写入结合起来:

import org.bson.Document;
import java.util.List;public class DataMigration {public static void main(String[] args) throws Exception {List<Document> mongoData = MongoReader.readData("localhost", 27017, "testdb", "user");// 数据预处理(可选)for (Document doc : mongoData) {// 例如:缺失字段补全、数据类型转换if (doc.get("email") == null) {doc.put("email", "unknown@example.com");}}MysqlWriter.writeData(mongoData);System.out.println("数据迁移完成,共迁移 " + mongoData.size() + " 条记录。");}
}

六、注意事项

  1. 数据类型映射
  • MongoDB 是 BSON 格式,类型较为灵活,而 MySQL 是强类型,需要提前做数据类型映射。
  • 特别注意 ObjectIdDateArrayDocument 嵌套等字段。
  1. 性能优化
  • 批量插入时控制批次大小(建议每批 1000 条)
  • 使用事务提交,减少网络往返
  • 对 MySQL 建立必要的索引
  1. 错误处理
  • 对异常数据记录日志,避免整体迁移失败
  • 支持断点续传,方便大数据量迁移
  1. 增量迁移
  • 如果 MongoDB 数据持续更新,可按时间戳或 ID 进行增量迁移
  • 可以在 Java 端加条件过滤:
collection.find(Filters.gt("create_time", lastSyncTime));

七、总结

通过 Java 将 MongoDB 数据转入 MySQL 主要分为三步:

  1. 连接 MongoDB 并读取集合数据
  2. 数据预处理,进行字段映射与类型转换
  3. 批量写入 MySQL,并优化性能与错误处理

这种方式适用于一次性迁移或定时同步,且无需额外的 ETL 工具,代码灵活可控。
如果业务数据量极大,可考虑使用 MongoDB Connector for BIKettleFlink 等大数据同步工具提升效率。