PHP数据仓库与ETL流程实现
PHP数据仓库与ETL流程实现ETL抽取、转换、加载是把数据从源系统迁移到目标系统的过程。PHP虽然不是专业的数据处理语言但实现简单的ETL流程很方便。先看一个完整的ETL流程实现。phpinterface Extractor{public function extract(): Generator;}interface Transformer{public function transform(array $row): ?array;}interface Loader{public function load(array $data): void;public function getStats(): array;}class CsvExtractor implements Extractor{private string $filePath;private int $batchSize;public function __construct(string $filePath, int $batchSize 100){$this-filePath $filePath;$this-batchSize $batchSize;}public function extract(): Generator{$handle fopen($this-filePath, r);if ($handle false) {throw new RuntimeException(无法打开文件: {$this-filePath});}$headers fgetcsv($handle);if ($headers false) {fclose($handle);return;}$batch [];while (($row fgetcsv($handle)) ! false) {$batch[] array_combine($headers, $row);if (count($batch) $this-batchSize) {yield $batch;$batch [];}}if (!empty($batch)) {yield $batch;}fclose($handle);}}class DatabaseExtractor implements Extractor{private PDO $pdo;private string $query;public function __construct(PDO $pdo, string $query){$this-pdo $pdo;$this-query $query;}public function extract(): Generator{$stmt $this-pdo-query($this-query);$batch [];while ($row $stmt-fetch(PDO::FETCH_ASSOC)) {$batch[] $row;if (count($batch) 100) {yield $batch;$batch [];}}if (!empty($batch)) {yield $batch;}}}class DataCleaner implements Transformer{private array $rules;public function __construct(array $rules []){$this-rules $rules;}public function transform(array $row): ?array{// 去除空行if (empty(array_filter($row))) return null;// 去除前后空格$row array_map(fn($v) is_string($v) ? trim($v) : $v, $row);// 应用转换规则foreach ($this-rules as $field $rule) {if (isset($row[$field])) {$row[$field] $this-applyRule($row[$field], $rule);}}return $row;}private function applyRule(mixed $value, array|string $rule): mixed{if (is_string($rule)) $rule [$rule];foreach ($rule as $r) {$value match ($r) {int (int)$value,float (float)$value,string (string)$value,lowercase strtolower((string)$value),uppercase strtoupper((string)$value),trim is_string($value) ? trim($value) : $value,null_if_empty empty($value) ? null : $value,default $value,};}return $value;}}class FieldMapper implements Transformer{private array $mapping;public function __construct(array $mapping){$this-mapping $mapping;}public function transform(array $row): ?array{$mapped [];foreach ($this-mapping as $sourceField $targetField) {if (isset($row[$sourceField])) {if (is_callable($targetField)) {$mapped[$sourceField] $targetField($row);} else {$mapped[$targetField] $row[$sourceField];}}}return $mapped;}}class DatabaseLoader implements Loader{private PDO $pdo;private string $table;private int $inserted 0;private int $updated 0;private int $failed 0;public function __construct(PDO $pdo, string $table){$this-pdo $pdo;$this-table $table;}public function load(array $data): void{if (empty($data)) return;$columns implode(, , array_keys($data[0]));$placeholders implode(, , array_fill(0, count($data[0]), ?));$sql INSERT INTO {$this-table} ({$columns}) VALUES ({$placeholders});$stmt $this-pdo-prepare($sql);foreach ($data as $row) {try {$stmt-execute(array_values($row));$this-inserted;} catch (Exception $e) {$this-failed;error_log(ETL加载失败: {$e-getMessage()});}}}public function getStats(): array{return [inserted $this-inserted,updated $this-updated,failed $this-failed,total $this-inserted $this-updated $this-failed,];}}class EtlPipeline{private Extractor $extractor;private array $transformers [];private Loader $loader;private array $metrics [];public function __construct(Extractor $extractor, Loader $loader){$this-extractor $extractor;$this-loader $loader;}public function addTransformer(Transformer $transformer): void{$this-transformers[] $transformer;}public function run(): array{$startTime microtime(true);$totalRows 0;$transformedRows 0;foreach ($this-extractor-extract() as $batch) {foreach ($batch as $row) {$totalRows;$transformed $row;foreach ($this-transformers as $transformer) {$transformed $transformer-transform($transformed);if ($transformed null) break;}if ($transformed ! null) {$this-loader-load([$transformed]);$transformedRows;}}}$duration round(microtime(true) - $startTime, 2);return [total_rows $totalRows,transformed_rows $transformedRows,duration_seconds $duration,rows_per_second $duration 0 ? round($totalRows / $duration, 2) : 0,loader_stats $this-loader-getStats(),];}}// 使用ETL$pdo new PDO(mysql:hostlocalhost;dbnametest, root, );$csvFile /tmp/source.csv;$fp fopen($csvFile, w);fputcsv($fp, [name, email, age, status]);fputcsv($fp, [ 张三 , zhangsantest.com, 28, active]);fputcsv($fp, [ 李四 , lisitest.com, 35, active]);fputcsv($fp, [ 王五 , , , ]);fclose($fp);$pipeline new EtlPipeline(new CsvExtractor($csvFile),new DatabaseLoader($pdo, users));$pipeline-addTransformer(new DataCleaner([age [int],email [lowercase, trim],name trim,]));$pipeline-addTransformer(new FieldMapper([name name,email email,age age,status status,]));$result $pipeline-run();print_r($result);?ETL是数据处理的基础工具。抽取阶段从各种数据源读取转换阶段清洗和映射加载阶段写入目标。PHP实现ETL的优势是开发速度快适合数据量不大的场景。大数据量的ETL建议用专业的工具如Apache NiFi、Apache Airflow或Spark。