如何使用Laravel和MySQL获取大量记录?

gfttwv5a  于 2023-06-21  发布在  Mysql
关注(0)|答案(5)|浏览(133)

我需要Maven的建议和解决方案。我们正在开发工作门户网站在这里处理约100万条记录。我们面临记录获取超时错误。如何使用Laravel和MySql处理这些记录?

我们正在尝试以下步骤:

1.增加PHP执行时间

  1. MySql索引
    1.分页
xqkwcwgp

xqkwcwgp1#

在处理大型数据集时,您应该对结果进行分块。这允许您处理较小的负载,减少内存消耗,并允许您将数据返回给用户,而其余的数据正在被提取/处理。请参阅laravel文档中关于分块的内容:
https://laravel.com/docs/5.5/eloquent#chunking-results
为了进一步加快速度,您可以利用多线程并产生并发进程,每个进程一次处理一个块。Symfony的Symfony\Component\Process\Process类使这很容易做到。
https://symfony.com/doc/current/components/process.html

ia2d9nvy

ia2d9nvy2#

从文档:
如果需要处理数千条数据库记录,请考虑使用chunk方法。该方法每次检索一小块结果,并将每个块提供给Closure进行处理。此方法对于编写处理数千条记录的Artisan命令非常有用。例如,让我们一次使用100条记录的块来处理整个users表:

DB::table('users')->orderBy('id')->chunk(100, function ($users) {
    foreach ($users as $user) {
        //
    }
});
webghufk

webghufk3#

嗨我想这个可能会有帮助

$users = User::groupBy('id')->orderBy('id', 'asc');

    $response = new StreamedResponse(function() use($users){
        $handle = fopen('php://output', 'w');
        // Add Excel headers
        fputcsv($handle, [
            'col1', 'Col 2'           ]);
        $users->chunk(1000, function($filtered_users) use($handle) {
            foreach ($filtered_users as $user) {
                // Add a new row with user data
                fputcsv($handle, [
                    $user->col1, $user->col2
                ]);
            }
        });
        // Close the output stream
        fclose($handle);
        }, 200, [
            'Content-Type' => 'text/csv',
            'Content-Disposition' => 'attachment; filename="Users'.Carbon::now()->toDateTimeString().'.csv"',
        ]);

        return $response;
kmb7vmvb

kmb7vmvb4#

Laravel为此提供了一个懒惰的功能。我尝试了chunk和cursor。游标进行一次查询,并将大量数据放入内存中,如果数据库中有数百万条记录,这是没有用的。Chunk也不错,但在编写代码的方式上懒惰得多。

use App\Models\Flight;
 
foreach (Flight::lazy() as $flight) {
    //
}

来源:https://laravel.com/docs/9.x/eloquent#chunking-results

sqyvllje

sqyvllje5#

挑战:

下载超过3000万条记录的CSV文件

解决方案:

实现Laravel作业链管理3000万条记录的处理将数据存储在30个CSV文件中(每个CSV中为1 M),并将其下载为单个Zip文件。

IN控制器

public function trans_recharge_rep_dl_custm_csv_chunk_queue(Request $request)
    {
        $chunkSize = 25000;  // 25k data per chunk for safety but server capacity up to 70k for 134mb of running processes
        $recharge_archives = TransactionReportFacade::trans_recharge_rep_process($request);
        $totalCount = $recharge_archives->count();
        $numberOfChunks = ceil($totalCount / $chunkSize);
        $fileName = "file_".Str::slug(getNow()."-".rand(1,999)).".csv";
        $report_type = $request->report_type=='recharge_report' ? 'recharge' : 'transactions';
        $file_base_url = "/uploads/reports/".$report_type."/".getToday()."/csv";
        $file_full_base_url = $_SERVER['DOCUMENT_ROOT']."/uploads/reports/".$report_type."/".getToday()."/csv";
        $file_url = $file_base_url."/".$fileName;
        $file_full_url = $file_full_base_url."/".$fileName;
        $report_type_name = ($request->report_type=='recharge_report' ? 'Recharge Report':'Transaction Report');

        $data = $totalCount>1000000 ?
                    TransactionReportFacade::transaction_rep_download_more_than_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name)
                    :
                    TransactionReportFacade::transaction_rep_download_less_or_eq_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name);

        return $this->set_response($data, 200,'success', ['Transaction report file is being generated. Please allow us some time. You may download the csv file in notification section.'], $request->merge(['log_type_id' => 3]));
    }

在Facade / Helper中

<?php

namespace App\Facades\Modules\Report\TransactionReport;

use ZipArchive;
use App\Models\RechargeArchives;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Bus;
use App\Jobs\TransactionReportCSVChunk;
use Illuminate\Support\Facades\File;

class TransactionReportHelper
{
    public function trans_recharge_rep_process($request)
    {    
        $recharge_daily_tbl = RechargeArchives::with('reduction_list:id,ruid');

        if ($request->report_type == 'recharge_report') {
            if (isset($request->request_date_from)) {
                $recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.recharge_date', '>=', $request->request_date_from);
            }

            if (isset($request->request_date_to)) {
                $recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.recharge_date', '<=', $request->request_date_to);
            }
        } else {
            if (isset($request->request_date_from)) {
                $recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.request_date', '>=', $request->request_date_from);
            }

            if (isset($request->request_date_to)) {
                $recharge_daily_tbl = $recharge_daily_tbl->where('recharge_archives.request_date', '<=', $request->request_date_to);
            }
        }

        return $recharge_daily_tbl;
    }

    // transaction report less than or equal to 1 Millon
    public function transaction_rep_download_less_or_eq_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name)
    {
        $batches  = [];
        for($i=1; $i<=$numberOfChunks; $i++)
        {
            $offset = ($i * $chunkSize) - $chunkSize;
            $batches[] = new TransactionReportCSVChunk(
                [
                    'request' => $request->all(),
                    'offset' => $offset, 'totalCount' => $totalCount, 'chunkSize' => $chunkSize, 'numberOfChunks' => $numberOfChunks, 'fileName' => $fileName, 'file_base_url' => $file_base_url, 'file_full_base_url' => $file_full_base_url,  'file_url' => $file_url, 'file_full_url' => $file_full_url, 'user_id'  =>  auth()->user()->id, 'report_type_name' => $report_type_name
                ]
            );
        }

        Bus::chain($batches)->dispatch();
        $data = [
            'download' => $file_url
        ];
        return $data;
    }

    // transaction report more than or equal to 1 Millon
    public function transaction_rep_download_more_than_1m($request, $totalCount, $chunkSize, $numberOfChunks, $fileName, $file_base_url, $file_full_base_url, $file_url, $file_full_url, $report_type_name)
    {
        // $fileName = is master filename that will be renamed as multiple file names
        $files_count = (int) ceil($totalCount/1000000);
        $files = [];  // each file contains 1m data

        $zip_file_name = substr($fileName, 0, -3).'zip';
        $zip_file_url = substr($file_url, 0, -3).'zip';
        $zip_file_full_base_url = $file_full_base_url;
        $zip_file_full_url = substr($file_full_url, 0, -3).'zip';

        for($i = 0; $i < $files_count; $i++)
        {
            $files[] = $i.'-'.$fileName;
        }

        $batches  = [];
        $offset=1;

        for($i=$offset; $i<=$numberOfChunks; $i++)
        {
            $offset = ($i * $chunkSize) - $chunkSize;

            $fileName = $files[$offset/1000000]; // file name overwritten for each 1m datasets
            $file_url = $file_base_url.'/'.$fileName; // file full base url overwritten for each 1m datasets
            $file_full_url = $file_full_base_url.'/'.$fileName; // file full base url overwritten for each 1m datasets

            $batches[] = new TransactionReportCSVChunk(
                [
                    'request' => $request->all(),
                    'offset' => $offset, 'totalCount' => $totalCount, 'chunkSize' => $chunkSize, 'numberOfChunks' => $numberOfChunks, 'fileName' => $fileName, 'file_base_url' => $file_base_url, 'files_count' => $files_count, 'file_full_base_url' => $file_full_base_url,  'file_url' => $file_url, 'file_full_url' => $file_full_url, 'user_id'  =>  auth()->user()->id, 'report_type_name' => $report_type_name, 'files' => $files, 'zip_file_name' => $zip_file_name, 'zip_file_url' => $zip_file_url, 'zip_file_full_base_url' => $zip_file_full_base_url, 'zip_file_full_url' => $zip_file_full_url
                ]
            );
        }
        Bus::chain($batches)->dispatch();
        $data = [
            'download' => $zip_file_url
        ];
        return $data;
    }

}

在作业队列中

<?php

namespace App\Jobs;

use ZipArchive;
use Illuminate\Bus\Batchable;
use Illuminate\Bus\Queueable;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\File;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use App\Facades\Modules\Report\TransactionReport\TransactionReportFacade;

class TransactionReportCSVChunk implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $data;

    public function __construct($data = [])
    {
        $this->data = $data;
    }

    public function handle()
    {
        $zip = new ZipArchive();

        $process_start_time = getTodayDateTime();

        $request = (object) $this->data['request'];
        $offset = $this->data['offset'] ;
        $totalCount = $this->data['totalCount'] ;
        $chunkSize = $this->data['chunkSize'] ;
        $numberOfChunks = $this->data['numberOfChunks'] ;
        $fileName = $this->data['fileName'] ;
        $file_base_url = $this->data['file_base_url'] ;
        $file_full_base_url = $this->data['file_full_base_url'] ;
        $file_url = $this->data['file_url'] ;
        $file_full_url = $this->data['file_full_url'] ;
        $user_id = $this->data['user_id'] ;
        $report_type_name = $this->data['report_type_name'] ;
        $files = $this->data['files'] ?? [] ;
        $zip_file_name = $this->data['zip_file_name'] ?? '' ;
        $zip_file_url = $this->data['zip_file_url'] ?? '' ;
        $zip_file_full_base_url = $this->data['zip_file_full_base_url'] ?? '' ;
        $zip_file_full_url = $this->data['zip_file_full_url'] ?? '' ;

        $recharge_archives = TransactionReportFacade::trans_recharge_rep_process($request);

        writeToLog('======Queue started========'.'$offset='.$offset, 'debug');

        try {
            File::ensureDirectoryExists($file_full_base_url);

            // Open/Create the file
            $fp = fopen($file_full_url, 'a');

            if($offset==0)
            {
                $header = [
                    "Serial",
                    "Operator" ,
                    "Client" ,
                    ($request->report_type=='recharge_report' ? 'Recharge Date':'Request Date') ,
                    "Sender" ,
                    "Recipient" ,
                    "Amount" ,
                    "Status" ,
                    "Connection Type" ,
                ];
                // Write to the csv
                fputcsv($fp, $header);
            }

            $operator_info = DB::table('operator_info')->select('operator_id', 'operator_name')->get();
            $client_info = DB::table('client_info')->select('client_id', 'client_name')->get();
            $recharge_status_codes = DB::table('recharge_status_codes')->select('status_code', 'status_name')->get();

            $cursor = $recharge_archives
                        ->skip($offset)
                        ->take($chunkSize)
                        ->cursor();

            foreach ($cursor as $item)
            {
                $item_data = [
                                "Serial" => $offset+1,  //  First chunk 0, then 10000, 20000
                                "Operator" => $operator_info->where('operator_id', $item->operator_id)->pluck('operator_name')->first() ,
                                "Client" => $client_info->where('client_id', $item->client_id)->pluck('client_name')->first(),
                                ($request->report_type=='recharge_report' ? 'Recharge Date':'Request Date') => ($request->report_type=='recharge_report' ? $item->recharge_date:$item->request_date),
                                "Sender" => $item->request_sender_id,
                                "Recipient" => $item->recharge_recipient_msisdn,
                                "Amount" => $item->amount,
                                "Status" => $recharge_status_codes->where('status_code', $item->status_code)->pluck('status_name')->first(),
                                "Connection Type" => $item->connection_type,
                            ];
                fputcsv($fp, $item_data);
                $offset = $offset+1;
            }

            $process_end_time = getTodayDateTime();

            $user = DB::table('users_report')->where('id', $user_id)->first();

            // Close the file
            fclose($fp);

            // When all jobs/chunks have finished
            if ($offset>=$totalCount)
            {
                $file_type = 'text/csv'; // default csv format
                if($totalCount>1000000) // zip proces will only applicable for 1M+ dataset
                {
                    $file_url = $zip_file_url; // .csv to .zip file format
                    $file_type = 'application/zip'; // .csv to .zip file format for more than 1m data
                    $zip = new ZipArchive();

                    writeToLog('$zip_file_full_url = '.$zip_file_full_url, 'debug');
                    $zip->open($zip_file_full_url, ZipArchive::CREATE);

                    foreach ($files as $key => $csvfilename)
                    {
                        // Add the CSV file to the ZIP archive
                        writeToLog('$file_full_base_url $csvfilename '.$file_full_base_url.'/'. $csvfilename, 'debug');
                        $zip->addFile($file_full_base_url.'/'. $csvfilename, $csvfilename);
                    }
                    $zip->close();
                }
                reportlogsGenerate($fileName, $file_url, $user_id);
                $notification_response = [
                    'is_report' => 1 ,
                    'report_name' => $report_type_name,
                    'process_start_time' => $process_start_time,
                    'process_end_time' => $process_end_time,
                    'execution_time' => getTimeDuration($process_start_time, $process_end_time),
                    'no_of_records' => $offset,
                    'filePath' => $file_url,
                    "type" => $file_type,
                ];
                notificationsGenerate($notification_response, $user_id, $report_type_name.' is generated.', 'text/csv');
            }

            writeToLog('======Queue ended========'.'$offset='.$offset, 'debug');
        } catch (\Throwable $e) {
            report($e);
            return false;
        }
    }
}

相关问题