我需要Maven的建议和解决方案。我们正在开发工作门户网站在这里处理约100万条记录。我们面临记录获取超时错误。如何使用Laravel和MySql处理这些记录?
我们正在尝试以下步骤:
1.增加PHP执行时间
xqkwcwgp1#
在处理大型数据集时,您应该对结果进行分块。这允许您处理较小的负载,减少内存消耗,并允许您将数据返回给用户,而其余的数据正在被提取/处理。请参阅laravel文档中关于分块的内容:https://laravel.com/docs/5.5/eloquent#chunking-results为了进一步加快速度,您可以利用多线程并产生并发进程,每个进程一次处理一个块。Symfony的Symfony\Component\Process\Process类使这很容易做到。https://symfony.com/doc/current/components/process.html
Symfony\Component\Process\Process
ia2d9nvy2#
从文档:如果需要处理数千条数据库记录,请考虑使用chunk方法。该方法每次检索一小块结果,并将每个块提供给Closure进行处理。此方法对于编写处理数千条记录的Artisan命令非常有用。例如,让我们一次使用100条记录的块来处理整个users表:
DB::table('users')->orderBy('id')->chunk(100, function ($users) { foreach ($users as $user) { // } });
webghufk3#
嗨我想这个可能会有帮助
$users = User::groupBy('id')->orderBy('id', 'asc'); $response = new StreamedResponse(function() use($users){ $handle = fopen('php://output', 'w'); // Add Excel headers fputcsv($handle, [ 'col1', 'Col 2' ]); $users->chunk(1000, function($filtered_users) use($handle) { foreach ($filtered_users as $user) { // Add a new row with user data fputcsv($handle, [ $user->col1, $user->col2 ]); } }); // Close the output stream fclose($handle); }, 200, [ 'Content-Type' => 'text/csv', 'Content-Disposition' => 'attachment; filename="Users'.Carbon::now()->toDateTimeString().'.csv"', ]); return $response;
kmb7vmvb4#
Laravel为此提供了一个懒惰的功能。我尝试了chunk和cursor。游标进行一次查询,并将大量数据放入内存中,如果数据库中有数百万条记录,这是没有用的。Chunk也不错,但在编写代码的方式上懒惰得多。
use App\Models\Flight; foreach (Flight::lazy() as $flight) { // }
来源:https://laravel.com/docs/9.x/eloquent#chunking-results
sqyvllje5#
下载超过3000万条记录的CSV文件
实现Laravel作业链管理3000万条记录的处理将数据存储在30个CSV文件中(每个CSV中为1 M),并将其下载为单个Zip文件。
public function trans_recharge_rep_dl_custm_csv_chunk_queue(Request $request) { $chunkSize = 25000; // 25k data per chunk for safety but server capacity up to 70k for 134mb of running processes $recharge_archives = TransactionReportFacade::trans_recharge_rep_process($request); $totalCount = $recharge_archives->count(); $numberOfChunks = ceil($totalCount / $chunkSize); $fileName = "file_".Str::slug(getNow()."-".rand(1,999)).".csv"; $report_type = $request->report_type=='recharge_report' ? 'recharge' : 'transactions'; $file_base_url = "/uploads/reports/".$report_type."/".getToday()."/csv"; $file_full_base_url = $_SERVER['DOCUMENT_ROOT']."/uploads/reports/".$report_type."/".getToday()."/csv"; $file_url = $file_base_url."/".$fileName; $file_full_url = $file_full_base_url."/".$fileName; $report_type_name = ($request->report_type=='recharge_report' ? 'Recharge Report':'Transaction Report'); $data = $totalCount>1000000 ? TransactionReportFacade::transaction_rep_download_more_than_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name) : TransactionReportFacade::transaction_rep_download_less_or_eq_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name); return $this->set_response($data, 200,'success', ['Transaction report file is being generated. Please allow us some time. You may download the csv file in notification section.'], $request->merge(['log_type_id' => 3])); }
<?php namespace App\Facades\Modules\Report\TransactionReport; use ZipArchive; use App\Models\RechargeArchives; use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\Bus; use App\Jobs\TransactionReportCSVChunk; use Illuminate\Support\Facades\File; class TransactionReportHelper { public function trans_recharge_rep_process($request) { $recharge_daily_tbl = RechargeArchives::with('reduction_list:id,ruid'); if ($request->report_type == 'recharge_report') { if (isset($request->request_date_from)) { $recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.recharge_date', '>=', $request->request_date_from); } if (isset($request->request_date_to)) { $recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.recharge_date', '<=', $request->request_date_to); } } else { if (isset($request->request_date_from)) { $recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.request_date', '>=', $request->request_date_from); } if (isset($request->request_date_to)) { $recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.request_date', '<=', $request->request_date_to); } } return $recharge_daily_tbl; } // transaction report less than or equal to 1 Millon public function transaction_rep_download_less_or_eq_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name) { $batches = []; for($i=1; $i<=$numberOfChunks; $i++) { $offset = ($i * $chunkSize) - $chunkSize; $batches[] = new TransactionReportCSVChunk( [ 'request' => $request->all(), 'offset' => $offset, 'totalCount' => $totalCount, 'chunkSize' => $chunkSize, 'numberOfChunks' => $numberOfChunks, 'fileName' => $fileName, 'file_base_url' => $file_base_url, 'file_full_base_url' => $file_full_base_url, 'file_url' => $file_url, 'file_full_url' => $file_full_url, 'user_id' => auth()->user()->id, 'report_type_name' => $report_type_name ] ); } Bus::chain($batches)->dispatch(); $data = [ 'download' => $file_url ]; return $data; } // transaction report more than or equal to 1 Millon public function transaction_rep_download_more_than_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name) { // $fileName = is master filename that will be renamed as multiple file names $files_count = (int) ceil($totalCount/1000000); $files = []; // each file contains 1m data $zip_file_name = substr($fileName, 0, -3).'zip'; $zip_file_url = substr($file_url, 0, -3).'zip'; $zip_file_full_base_url = $file_full_base_url; $zip_file_full_url = substr($file_full_url, 0, -3).'zip'; for($i = 0; $i < $files_count; $i++) { $files[] = $i.'-'.$fileName; } $batches = []; $offset=1; for($i=$offset; $i<=$numberOfChunks; $i++) { $offset = ($i * $chunkSize) - $chunkSize; $fileName = $files[$offset/1000000]; // file name overwritten for each 1m datasets $file_url = $file_base_url.'/'.$fileName; // file full base url overwritten for each 1m datasets $file_full_url = $file_full_base_url.'/'.$fileName; // file full base url overwritten for each 1m datasets $batches[] = new TransactionReportCSVChunk( [ 'request' => $request->all(), 'offset' => $offset, 'totalCount' => $totalCount, 'chunkSize' => $chunkSize, 'numberOfChunks' => $numberOfChunks, 'fileName' => $fileName, 'file_base_url' => $file_base_url, 'files_count' => $files_count, 'file_full_base_url' => $file_full_base_url, 'file_url' => $file_url, 'file_full_url' => $file_full_url, 'user_id' => auth()->user()->id, 'report_type_name' => $report_type_name, 'files' => $files, 'zip_file_name' => $zip_file_name, 'zip_file_url' => $zip_file_url, 'zip_file_full_base_url' => $zip_file_full_base_url, 'zip_file_full_url' => $zip_file_full_url ] ); } Bus::chain($batches)->dispatch(); $data = [ 'download' => $zip_file_url ]; return $data; } }
<?php namespace App\Jobs; use ZipArchive; use Illuminate\Bus\Batchable; use Illuminate\Bus\Queueable; use Illuminate\Support\Facades\DB; use Illuminate\Support\Facades\File; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; use App\Facades\Modules\Report\TransactionReport\TransactionReportFacade; class TransactionReportCSVChunk implements ShouldQueue { use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $data; public function __construct($data = []) { $this->data = $data; } public function handle() { $zip = new ZipArchive(); $process_start_time = getTodayDateTime(); $request = (object) $this->data['request']; $offset = $this->data['offset'] ; $totalCount = $this->data['totalCount'] ; $chunkSize = $this->data['chunkSize'] ; $numberOfChunks = $this->data['numberOfChunks'] ; $fileName = $this->data['fileName'] ; $file_base_url = $this->data['file_base_url'] ; $file_full_base_url = $this->data['file_full_base_url'] ; $file_url = $this->data['file_url'] ; $file_full_url = $this->data['file_full_url'] ; $user_id = $this->data['user_id'] ; $report_type_name = $this->data['report_type_name'] ; $files = $this->data['files'] ?? [] ; $zip_file_name = $this->data['zip_file_name'] ?? '' ; $zip_file_url = $this->data['zip_file_url'] ?? '' ; $zip_file_full_base_url = $this->data['zip_file_full_base_url'] ?? '' ; $zip_file_full_url = $this->data['zip_file_full_url'] ?? '' ; $recharge_archives = TransactionReportFacade::trans_recharge_rep_process($request); writeToLog('======Queue started========'.'$offset='.$offset, 'debug'); try { File::ensureDirectoryExists($file_full_base_url); // Open/Create the file $fp = fopen($file_full_url, 'a'); if($offset==0) { $header = [ "Serial", "Operator" , "Client" , ($request->report_type=='recharge_report' ? 'Recharge Date':'Request Date') , "Sender" , "Recipient" , "Amount" , "Status" , "Connection Type" , ]; // Write to the csv fputcsv($fp, $header); } $operator_info = DB::table('operator_info')->select('operator_id', 'operator_name')->get(); $client_info = DB::table('client_info')->select('client_id', 'client_name')->get(); $recharge_status_codes = DB::table('recharge_status_codes')->select('status_code', 'status_name')->get(); $cursor = $recharge_archives ->skip($offset) ->take($chunkSize) ->cursor(); foreach ($cursor as $item) { $item_data = [ "Serial" => $offset+1, // First chunk 0, then 10000, 20000 "Operator" => $operator_info->where('operator_id', $item->operator_id)->pluck('operator_name')->first() , "Client" => $client_info->where('client_id', $item->client_id)->pluck('client_name')->first(), ($request->report_type=='recharge_report' ? 'Recharge Date':'Request Date') => ($request->report_type=='recharge_report' ? $item->recharge_date:$item->request_date), "Sender" => $item->request_sender_id, "Recipient" => $item->recharge_recipient_msisdn, "Amount" => $item->amount, "Status" => $recharge_status_codes->where('status_code', $item->status_code)->pluck('status_name')->first(), "Connection Type" => $item->connection_type, ]; fputcsv($fp, $item_data); $offset = $offset+1; } $process_end_time = getTodayDateTime(); $user = DB::table('users_report')->where('id', $user_id)->first(); // Close the file fclose($fp); // When all jobs/chunks have finished if ($offset>=$totalCount) { $file_type = 'text/csv'; // default csv format if($totalCount>1000000) // zip proces will only applicable for 1M+ dataset { $file_url = $zip_file_url; // .csv to .zip file format $file_type = 'application/zip'; // .csv to .zip file format for more than 1m data $zip = new ZipArchive(); writeToLog('$zip_file_full_url = '.$zip_file_full_url, 'debug'); $zip->open($zip_file_full_url, ZipArchive::CREATE); foreach ($files as $key => $csvfilename) { // Add the CSV file to the ZIP archive writeToLog('$file_full_base_url $csvfilename '.$file_full_base_url.'/'. $csvfilename, 'debug'); $zip->addFile($file_full_base_url.'/'. $csvfilename, $csvfilename); } $zip->close(); } reportlogsGenerate($fileName, $file_url, $user_id); $notification_response = [ 'is_report' => 1 , 'report_name' => $report_type_name, 'process_start_time' => $process_start_time, 'process_end_time' => $process_end_time, 'execution_time' => getTimeDuration($process_start_time, $process_end_time), 'no_of_records' => $offset, 'filePath' => $file_url, "type" => $file_type, ]; notificationsGenerate($notification_response, $user_id, $report_type_name.' is generated.', 'text/csv'); } writeToLog('======Queue ended========'.'$offset='.$offset, 'debug'); } catch (\Throwable $e) { report($e); return false; } } }
5条答案
按热度按时间xqkwcwgp1#
在处理大型数据集时,您应该对结果进行分块。这允许您处理较小的负载,减少内存消耗,并允许您将数据返回给用户,而其余的数据正在被提取/处理。请参阅laravel文档中关于分块的内容:
https://laravel.com/docs/5.5/eloquent#chunking-results
为了进一步加快速度,您可以利用多线程并产生并发进程,每个进程一次处理一个块。Symfony的
Symfony\Component\Process\Process
类使这很容易做到。https://symfony.com/doc/current/components/process.html
ia2d9nvy2#
从文档:
如果需要处理数千条数据库记录,请考虑使用chunk方法。该方法每次检索一小块结果,并将每个块提供给Closure进行处理。此方法对于编写处理数千条记录的Artisan命令非常有用。例如,让我们一次使用100条记录的块来处理整个users表:
webghufk3#
嗨我想这个可能会有帮助
kmb7vmvb4#
Laravel为此提供了一个懒惰的功能。我尝试了chunk和cursor。游标进行一次查询,并将大量数据放入内存中,如果数据库中有数百万条记录,这是没有用的。Chunk也不错,但在编写代码的方式上懒惰得多。
来源:https://laravel.com/docs/9.x/eloquent#chunking-results
sqyvllje5#
挑战:
下载超过3000万条记录的CSV文件
解决方案:
实现Laravel作业链管理3000万条记录的处理将数据存储在30个CSV文件中(每个CSV中为1 M),并将其下载为单个Zip文件。
IN控制器
在Facade / Helper中
在作业队列中