亿级数据高效入库:Hyperf协程实战大文件导入优化
在数据密集型应用中,我们经常需要处理将大型文件(如CSV、JSON等)导入数据库的任务。今天我要分享一个基于PHP(Hyperf框架)和AMQP的大文件导入解决方案,这个方案在实际项目中表现优异,能够高效稳定地处理数百万甚至上亿条记录的导入。
问题背景
我们面临的业务场景是:用户上传包含设备信息的大文件(通常几百MB到几GB),需要将这些数据高效地导入到数据库表中,同时需要:
- 支持断点续传
- 实时进度跟踪
- 可随时停止
- 高吞吐量
- 错误处理和重试机制
架构设计
我们采用了生产者-消费者模式,通过AMQP消息队列来解耦任务调度和实际处理:
[上传文件] → [创建任务] → [发送AMQP消息] → [消费者处理] → [完成通知]
关键技术点
1. 协程并发处理
private function startWorkers(Channel $workChannel, Channel $doneChannel): array
{
$workers = [];
for ($i = 0; $i < $this->config['max_workers']; $i++) {
$workers[] = Coroutine::create(function () use ($i, $workChannel, $doneChannel) {
// 工作协程逻辑
});
}
return $workers;
}
我们使用Swoole的协程特性,创建多个工作协程并发处理数据。每个工作协程从通道(Channel)中获取数据批次进行处理,显著提高了吞吐量。
2. 批处理与事务
private function processBatchWithRetry(array $batch): void
{
$retryCount = 0;
while ($retryCount <= $this->config['max_retries']) {
try {
Db::connection('default')->transaction(function () use ($batch) {
foreach (array_chunk($batch, 1000) as $chunk) {
Db::table($this->task->table_name)->insert($chunk);
}
});
$this->saveProgress($batchSize);
return;
} catch (\Throwable $e) {
$retryCount++;
usleep(100000 * $retryCount); // 指数退避
}
}
$this->fallbackToSingleInsert($batch, $e);
}
采用批处理+事务的方式大幅提高了插入效率。当批处理失败时,会自动降级为单条插入,确保数据不丢失。
3. 断点续传与进度跟踪
private function skipProcessedLines($file): void
{
$startLine = $this->task->num ?? 0;
$this->redis->set(RedisKey::getExportProgressKey($this->task->id),$startLine);
for ($i = 0; $i < $startLine; $i++) {
fgets($file);
}
}
通过Redis原子计数器和文件指针跳转实现断点续传。进度信息同时保存在数据库和Redis中,便于前端展示。
4. 优雅停止机制
private function checkStopSignal(): void
{
static $counter = 0;
if (++$counter % 1000 === 0) {
$this->shouldStop = (bool)$this->redis->get($this->stopSignalKey);
if ($this->shouldStop) {
$this->updateTaskStatus(97, '导入已停止');
$this->logger->info("Received stop signal for task {$this->task->id}");
}
}
}
通过Redis设置停止信号,工作协程定期检查,收到信号后可以安全停止并保存当前进度。
5. 资源管理与锁机制
private function acquireLock(): bool
{
$lockKey = RedisKey::getExportRunningKey($this->task->id);
return (bool)$this->redis->set(
$lockKey,
time(),
['NX', 'EX' => $this->config['lock_ttl']]
);
}
使用Redis分布式锁确保同一任务不会被多个消费者同时处理,避免数据混乱。
性能优化技巧
批处理大小调整:根据实际测试,我们发现10000条/批的规模在大多数情况下能取得最佳性能。
协程数量控制:协程不是越多越好,16个工作协程在我们的测试环境中表现最佳。
内存管理:及时释放不再需要的变量和资源,避免内存泄漏。
I/O优化:使用缓冲区和合理的读写策略减少磁盘I/O开销。
数据库优化:在导入前禁用索引,导入后再重建,可以大幅提高速度。
错误处理与监控
我们实现了多层次的错误处理:
- 批处理失败重试:自动重试3次,每次间隔递增
- 单条记录降级处理:批处理失败后尝试逐条插入
- 详细错误日志:记录失败记录的详细信息和错误原因
- 状态监控:实时更新任务状态和进度
实际效果
在生产环境中,这个解决方案能够稳定处理:
- 1亿条记录:约1小时完成
- 1000万条记录:约5分钟完成
- 系统资源占用平稳,不会出现内存暴涨或CPU满载的情况
总结
大文件导入是一个看似简单实则复杂的问题,需要考虑并发、错误处理、资源管理等多方面因素。通过合理的架构设计和性能优化,我们实现了高效稳定的解决方案。关键点包括:
- 使用协程提高并发能力
- 批处理+事务保证效率和数据一致性
- 完善的错误处理和重试机制
- 实时进度跟踪和优雅停止
- 合理的资源管理和锁机制
这个方案不仅适用于设备信息导入,稍作修改也可用于其他类似的大数据导入场景。希望这些经验对面临类似挑战的开发者有所帮助。
消费者完全代码参考
App\Amqp\Consumer\ExportFileConsumer
<?php
declare(strict_types=1);
namespace App\Amqp\Consumer;
use App\Cache\RedisKey;
use App\Model\MIdfaPoolExportLog;
use Hyperf\Amqp\Message\Type;
use Hyperf\Amqp\Result;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Coroutine\Coroutine;
use Hyperf\Database\Schema\Blueprint;
use Hyperf\Database\Schema\Schema;
use Hyperf\DbConnection\Db;
use Hyperf\Di\Annotation\Inject;
use Hyperf\Redis\Redis;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Container\ContainerInterface;
use Swoole\Coroutine\Channel;
#[Consumer(exchange: 'ExportFile', routingKey: 'ExportFile', queue: 'ExportFile.success', name: "ExportFileOptimizedConsumer", nums: 1, enable: true)]
class ExportFileConsumer extends ConsumerMessage
{
/**
* @var string 消息类型
*/
protected string $type = Type::DIRECT;
/**
* @var ContainerInterface|null
*/
#[Inject]
public ?ContainerInterface $container;
/**
* @var Redis
*/
#[Inject]
protected Redis $redis;
/**
* @var MIdfaPoolExportLog
*/
private MIdfaPoolExportLog $task;
/**
* @var StdoutLoggerInterface
*/
#[Inject]
protected StdoutLoggerInterface $logger;
/**
* @var bool
*/
private bool $shouldStop = false;
/**
* @var string
*/
private string $stopSignalKey;
// 配置参数
private array $config = [
'batch_size' => 10000, // 每批次处理的行数
'max_workers' => 16, // 最大工作协程数
'progress_interval' => 30, // 进度保存间隔(秒)
'line_chunk' => 500000, // 每处理多少行记录一次进度
'lock_ttl' => 60, // Redis锁的TTL(秒)
'max_retries' => 3, // 最大重试次数
];
public function consumeMessage($data, AMQPMessage $message): string
{
if (empty($data['id'])) {
return Result::ACK;
}
try {
$this->initializeTask($data['id']);
$this->processFile();
$this->completeTask();
} catch (\Throwable $e) {
$this->handleFailure($e);
}
return Result::ACK;
}
private function initializeTask(int $taskId): void
{
$this->task = MIdfaPoolExportLog::find($taskId);
if (!$this->task) {
throw new \RuntimeException('Task not found');
}
if (!$this->acquireLock()) {
throw new \RuntimeException('Failed to acquire lock');
}
$this->createTableIfNotExists();
$this->stopSignalKey = RedisKey::getExportKey($this->task->id);
$this->shouldStop = false;
// 清除可能存在的旧停止信号
$this->redis->del($this->stopSignalKey);
}
private function checkStopSignal(): void
{
// 每处理1000行检查一次停止信号
static $counter = 0;
if (++$counter % 1000 === 0) {
$this->shouldStop = (bool)$this->redis->get($this->stopSignalKey);
// 如果收到停止信号,更新任务状态
if ($this->shouldStop) {
$this->updateTaskStatus(97, '导入已停止'); // 4表示已停止状态
$this->logger->info("Received stop signal for task {$this->task->id}");
}
}
}
private function acquireLock(): bool
{
// $lockKey = "export:lock:{$this->task->id}";
$lockKey = RedisKey::getExportRunningKey($this->task->id);
return (bool)$this->redis->set(
$lockKey,
time(),
['NX', 'EX' => $this->config['lock_ttl']]
);
}
private function createTableIfNotExists(): void
{
if (!Schema::hasTable($this->task->table_name)) {
Schema::create($this->task->table_name, function (Blueprint $table) {
$table->bigIncrements('id');
$table->string('gaid', 120)->nullable()->index();
$table->string('idfa', 120)->nullable()->index();
$table->string('ua', 500)->nullable();
$table->string('ip', 100)->nullable();
$table->string('os', 32)->nullable();
$table->string('model', 100)->nullable();
$table->string('geo', 50)->nullable();
$table->comment($this->task->table_note);
});
$this->updateTaskStatus(2, '开始导入流程');
}
}
private function processFile(): void
{
$file = fopen($this->task->file_path, 'r');
if (!$file) {
throw new \RuntimeException("Failed to open file: {$this->task->file_path}");
}
try {
$this->skipProcessedLines($file);
// 创建工作通道和完成通道
$workChannel = new Channel($this->config['max_workers'] * 2);
$doneChannel = new Channel();
// 启动工作者协程
$workers = $this->startWorkers($workChannel, $doneChannel);
// 生产数据批次
$this->produceBatches($file, $workChannel);
// 等待所有工作完成
$this->awaitWorkersCompletion($workChannel, $doneChannel, $workers);
} finally {
fclose($file);
}
}
private function skipProcessedLines($file): void
{
$startLine = $this->task->num ?? 0;
$this->redis->set(RedisKey::getExportProgressKey($this->task->id),$startLine);
for ($i = 0; $i < $startLine; $i++) {
fgets($file);
}
}
private function startWorkers(Channel $workChannel, Channel $doneChannel): array
{
$workers = [];
for ($i = 0; $i < $this->config['max_workers']; $i++) {
$workers[] = Coroutine::create(function () use ($i, $workChannel, $doneChannel) {
$processed = 0;
$startTime = microtime(true);
while (true) {
$batch = $workChannel->pop();
if ($batch === false) break; // 关闭信号
$processed += count($batch);
$this->processBatchWithRetry($batch);
}
$doneChannel->push([
'worker_id' => $i,
'processed' => $processed,
'duration' => microtime(true) - $startTime
]);
});
}
return $workers;
}
private function produceBatches($file, Channel $workChannel): void
{
$batch = [];
$processedLines = $this->task->num ?? 0;
$lastSaveTime = time();
$startTime = microtime(true);
$lastReport = $startTime;
while (!feof($file) && !$this->shouldStop) {
$this->checkStopSignal();
$line = fgets($file);
if ($data = $this->parseLine($line)) {
$batch[] = $data;
}
if (count($batch) >= $this->config['batch_size']) {
$workChannel->push($batch);
$processedLines += count($batch);
$batch = [];
$this->refreshLock();
// 性能报告
$now = microtime(true);
if ($now - $lastReport > 5) {
$speed = $processedLines / ($now - $startTime);
$this->logger->info(sprintf(
'Processing: %d lines (%.1f lines/sec)',
$processedLines,
$speed
));
$lastReport = $now;
}
}
}
// 处理剩余批次
if (!empty($batch) && !$this->shouldStop) {
$workChannel->push($batch);
}
// 发送关闭信号给工作者
$workChannel->close();
}
private function awaitWorkersCompletion(Channel $workChannel, Channel $doneChannel, array $workers): void
{
// 关闭工作通道
$workChannel->close();
// 等待所有工作者完成并收集统计信息
$totalProcessed = 0;
$workerStats = [];
for ($i = 0; $i < count($workers); $i++) {
$result = $doneChannel->pop();
$totalProcessed += $result['processed'];
$workerStats[] = $result;
$this->logger->debug(sprintf(
'Worker %d completed: %d rows in %.2f seconds (%.1f rows/sec)',
$result['worker_id'],
$result['processed'],
$result['duration'],
$result['processed'] / max($result['duration'], 0.001)
));
}
$this->logger->info("Total processed rows: $totalProcessed");
}
private function saveProgress(int $processedLines): void
{
// 使用Redis原子计数器
$current = $this->redis->incrby(RedisKey::getExportProgressKey($this->task->id), $processedLines);
$this->task->num = $current;
$this->task->save();
}
private function processBatchWithRetry(array $batch): void
{
$retryCount = 0;
$lastError = null;
$batchSize = count($batch);
while ($retryCount <= $this->config['max_retries']) {
try {
// 使用事务确保批量插入的原子性
Db::connection('default')->transaction(function () use ($batch) {
// 分块处理防止单次SQL过大
foreach (array_chunk($batch, 1000) as $chunk) {
Db::table($this->task->table_name)->insert($chunk);
}
});
// 只在成功时更新计数
$this->saveProgress($batchSize);
return; // 成功则退出
} catch (\Throwable $e) {
$lastError = $e;
$retryCount++;
usleep(100000 * $retryCount); // 指数退避
}
}
// 重试失败后降级为单条插入
$this->fallbackToSingleInsert($batch, $lastError);
}
private function fallbackToSingleInsert(array $batch, \Throwable $originalError): void
{
$successCount = 0;
$errors = [];
foreach ($batch as $index => $item) {
try {
Db::table($this->task->table_name)->insert($item);
$successCount++;
} catch (\Throwable $e) {
$errors[] = [
'index' => $index,
'error' => $e->getMessage(),
'data' => json_encode($item, JSON_UNESCAPED_UNICODE)
];
}
}
// 使用更安全的日志记录方式
$this->logger->error(sprintf(
'Fallback single insert: %d succeeded, %d failed',
$successCount,
count($errors)
), [
'task_id' => $this->task->id,
'table' => $this->task->table_name,
'error_count' => count($errors),
'first_error' => $errors[0]['error'] ?? null,
'sample_error_data' => $errors[0]['data'] ?? null,
'original_error' => $e->getMessage()
]);
if (!empty($errors)) {
// 将详细错误记录到单独的文件
$this->logger->debug('Detailed insert errors', [
'task_id' => $this->task->id,
'errors' => $errors
]);
}
}
private function parseLine(string $line): ?array
{
$line = trim($line);
if (empty($line)) return null;
try {
$data = json_decode($line, true, 512, JSON_THROW_ON_ERROR);
return [
'gaid' => strtolower($data['device']['os'] ?? '') === 'android'
? ($data['device']['ifa'] ?? '')
: '',
'idfa' => strtolower($data['device']['os'] ?? '') === 'ios'
? ($data['device']['ifa'] ?? '')
: '',
'ua' => $data['device']['ua'] ?? '',
'ip' => $data['device']['ip'] ?? '',
'os' => $data['device']['os'] ?? '',
'model' => $data['device']['model'] ?? '',
'geo' => $data['device']['geo']['country'] ?? '',
];
} catch (\JsonException $e) {
$this->logger->debug('Failed to parse line', ['line' => $line]);
return null;
}
}
private function refreshLock(): void
{
// $lockKey = "export:lock:{$this->task->id}";
$lockKey = RedisKey::getExportRunningKey($this->task->id);
$this->redis->expire($lockKey, $this->config['lock_ttl']);
}
private function completeTask(): void
{
if ($this->shouldStop) {
$this->updateTaskStatus(
97,
'导入已停止(已处理 '.($this->task->num ?? 0).' 行)'
);
} else {
$this->updateTaskStatus(
3,
'导入完成(共处理 '.($this->task->num ?? 0).' 行)'
);
}
$this->releaseLock();
$this->cleanup();
}
private function updateTaskStatus(int $status, string $note): void
{
$this->task->status = $status;
$this->task->note = $note;
if ($status === 3) {
$this->task->completed_at = date('Y-m-d H:i:s');
}
$this->task->save();
}
private function releaseLock(): void
{
$this->redis->del(RedisKey::getExportRunningKey($this->task->id));
$this->redis->del(RedisKey::getExportProgressKey($this->task->id));
$this->redis->del(RedisKey::getExportKey($this->task->id));
}
private function cleanup(): void
{
try {
// 只在任务成功完成时删除文件
if ($this->task->status === 3 && !empty($this->task->file_path)) {
$this->deleteExportFile($this->task->file_path);
}
} catch (\Throwable $e) {
$this->logger->error('清理文件失败', [
'task_id' => $this->task->id,
'file_path' => $this->task->file_path ?? null,
'error' => $e->getMessage()
]);
}
}
private function deleteExportFile(string $filePath): void
{
if (!file_exists($filePath)) {
$this->logger->debug("文件不存在,无需删除: {$filePath}");
return;
}
// 实际删除文件
if (@unlink($filePath)) {
$this->logger->info("成功删除导出文件: {$filePath}");
} else {
throw new \RuntimeException("删除文件失败: {$filePath}");
}
}
private function handleFailure(\Throwable $e): void
{
$this->logger->error('Import failed', [
'task_id' => $this->task->id ?? null,
'error' => $e->getMessage(),
'trace' => $e->getTraceAsString(),
'progress' => $this->task->num ?? 0,
'stopped' => $this->shouldStop
]);
if (isset($this->task)) {
$status = $this->shouldStop
? 97
: 98;
$note = $this->shouldStop
? '导入已停止: '.$e->getMessage()
: '导入失败: '.$e->getMessage();
$this->updateTaskStatus($status, $note);
$this->releaseLock();
}
}
}