在实际业务中,随着系统的演进,有时需要将已有的 MongoDB 数据迁移到 MySQL。
这种需求常见于以下场景:
- 历史项目使用 MongoDB 存储非结构化数据,但后续业务需要使用关系型数据库进行统计、分析和事务处理。
- 新系统统一采用 MySQL 作为主要数据存储,需要整合原有的 MongoDB 数据。
- 将半结构化数据转为结构化表结构,以便和 BI 系统对接。
本文将以 Java 语言为例,讲解如何从 MongoDB 读取数据,并批量写入 MySQL,实现高效、安全的数据迁移。
一、前置条件
在开始之前,需要准备以下环境:
- MongoDB 数据库
- 确保可以远程访问(开放对应端口,默认 27017)
- 确认要迁移的集合(Collection)名称和结构
- MySQL 数据库
- 已创建目标数据库和表
- 根据 MongoDB 字段定义好表结构
- Java 开发环境
- JDK 1.8+(建议)
- Maven/Gradle 构建工具
- 相关依赖(MongoDB Java Driver、MySQL Connector/J)
- 数据迁移策略
- 全量迁移:一次性导出所有数据
- 增量迁移:根据时间戳、主键或业务标识增量导入
二、项目依赖
在 Maven 项目中,pom.xml
添加依赖:
<dependencies><!-- MongoDB Java Driver --><dependency><groupId>org.mongodb</groupId><artifactId>mongo-java-driver</artifactId><version>3.12.14</version></dependency><!-- MySQL Connector --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.33</version></dependency>
</dependencies>
三、MongoDB 数据读取
我们可以通过 MongoDB Java Driver 连接 MongoDB 并读取集合数据:
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;
import java.util.ArrayList;
import java.util.List;public class MongoReader {public static List<Document> readData(String host, int port, String dbName, String collectionName) {MongoClient mongoClient = new MongoClient(host, port);MongoDatabase database = mongoClient.getDatabase(dbName);MongoCollection<Document> collection = database.getCollection(collectionName);List<Document> result = new ArrayList<>();for (Document doc : collection.find()) {result.add(doc);}mongoClient.close();return result;}
}
这里我们一次性读取所有数据,如果数据量大,应考虑 分页读取 或使用 游标(Cursor) 进行流式处理。
四、MySQL 数据写入
假设目标表结构如下:
CREATE TABLE user_info (id BIGINT PRIMARY KEY,name VARCHAR(100),email VARCHAR(200),create_time DATETIME
);
我们可以用 JDBC 批量插入数据:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;
import org.bson.Document;public class MysqlWriter {private static final String URL = "jdbc:mysql://localhost:3306/test?useSSL=false&serverTimezone=UTC";private static final String USER = "root";private static final String PASSWORD = "123456";public static void writeData(List<Document> documents) throws Exception {Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);conn.setAutoCommit(false);String sql = "INSERT INTO user_info (id, name, email, create_time) VALUES (?, ?, ?, ?)";PreparedStatement ps = conn.prepareStatement(sql);for (Document doc : documents) {ps.setLong(1, doc.getLong("id"));ps.setString(2, doc.getString("name"));ps.setString(3, doc.getString("email"));ps.setTimestamp(4, new java.sql.Timestamp(doc.getDate("create_time").getTime()));ps.addBatch();}ps.executeBatch();conn.commit();ps.close();conn.close();}
}
五、整合迁移流程
我们可以编写一个主类,将 MongoDB 读取与 MySQL 写入结合起来:
import org.bson.Document;
import java.util.List;public class DataMigration {public static void main(String[] args) throws Exception {List<Document> mongoData = MongoReader.readData("localhost", 27017, "testdb", "user");// 数据预处理(可选)for (Document doc : mongoData) {// 例如:缺失字段补全、数据类型转换if (doc.get("email") == null) {doc.put("email", "unknown@example.com");}}MysqlWriter.writeData(mongoData);System.out.println("数据迁移完成,共迁移 " + mongoData.size() + " 条记录。");}
}
六、注意事项
- 数据类型映射
- MongoDB 是 BSON 格式,类型较为灵活,而 MySQL 是强类型,需要提前做数据类型映射。
- 特别注意
ObjectId
、Date
、Array
、Document
嵌套等字段。
- 性能优化
- 批量插入时控制批次大小(建议每批 1000 条)
- 使用事务提交,减少网络往返
- 对 MySQL 建立必要的索引
- 错误处理
- 对异常数据记录日志,避免整体迁移失败
- 支持断点续传,方便大数据量迁移
- 增量迁移
- 如果 MongoDB 数据持续更新,可按时间戳或 ID 进行增量迁移
- 可以在 Java 端加条件过滤:
collection.find(Filters.gt("create_time", lastSyncTime));
七、总结
通过 Java 将 MongoDB 数据转入 MySQL 主要分为三步:
- 连接 MongoDB 并读取集合数据
- 数据预处理,进行字段映射与类型转换
- 批量写入 MySQL,并优化性能与错误处理
这种方式适用于一次性迁移或定时同步,且无需额外的 ETL 工具,代码灵活可控。
如果业务数据量极大,可考虑使用 MongoDB Connector for BI、Kettle、Flink 等大数据同步工具提升效率。