我目前正在为一个应用程序实现一个钱包充值系统。我正在使用一个名为AdjustWalletBalance
的作业来处理这个问题。该工作只是读取客户当前的钱包余额并将其增加一定的金额。下面是作业的代码:
use App\Events\Tenant\WalletTopupSuccessful;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\Middleware\WithoutOverlapping;
use Illuminate\Queue\SerializesModels;
class AdjustWalletBalance implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
/**
* Create a new job instance.
*/
public function __construct(
/**
* The amount to add to wallet balance. Can be negative for deductions.
*/
public float $amount,
/**
* User who's wallet should be adjusted.
*/
public User $user,
)
{
$this->onQueue('wallets');
}
/**
* Execute the job.
*/
public function handle(): void
{
$process_id = mt_rand();
$wallet = $user->wallet;
\Log::debug("starting $process_id at " . now());
\Log::debug("adjusting wallet balance with amount: {$this->amount}");
\Log::debug("current balance for $process_id: {$wallet->balance}");
// Increasing the running time for the job to make it easier to have
// two simultaneous jobs running for the sake of this demonstration.
sleep(10);
$wallet->balance = $wallet->balance + $this->amount;
$wallet->save();
\Log::debug("$process_id updated balance to {$wallet->balance}");
}
public function middleware()
{
return [(new WithoutOverlapping($this->user->id))];
}
}
由于处理用户的钱包余额是一项非常敏感的任务,我不希望为特定用户运行多个作业示例,从而导致竞争条件和不正确的余额更新,因此我将WithoutOverlapping
中间件添加到作业中,使用用户的id作为唯一密钥。但是它不起作用,即使我添加了中间件,我仍然可以为完全相同的用户并行执行作业。
我有一个这样的路由条目:
Route::get('/queue-test', function() {
dispatch(new \App\Jobs\Tenant\AdjustWalletBalance(50, User::first());
});
我正在同时运行两个队列worker,如果我访问路由两次,我可以立即看到每个worker开始处理AdjustWalletBalance
作业,这不应该是:
。当我检查日志文件以进一步验证作业的执行顺序时,我看到了以下内容:
[2023-09-24 19:12:42] local.DEBUG: starting 1007152283 at: 2023-09-24 19:12:42
[2023-09-24 19:12:42] local.DEBUG: adjusting wallet balance with amount: 50
[2023-09-24 19:12:42] local.DEBUG: current balance for 1007152283: 0
[2023-09-24 19:12:43] local.DEBUG: starting 241490440 at: 2023-09-24 19:12:43
[2023-09-24 19:12:43] local.DEBUG: adjusting wallet balance with amount: 50
[2023-09-24 19:12:43] local.DEBUG: current balance for 241490440: 0
[2023-09-24 19:12:52] local.DEBUG: 1007152283 updated balance to 50
[2023-09-24 19:12:53] local.DEBUG: 241490440 updated balance to 50
正如你可能已经注意到的,这是一个很大的问题。用户的钱包被记入两次#50,但由于这两个作业同时运行,用户的钱包余额被更新为#50而不是#100!
我已经将CACHE_DRIVER
设置为array
,所以我认为这不是配置问题。
我还尝试使用ShouldBeUnique
合约并将用户的id设置为uniqueId,如下所示
class AdjustWalletBalance implements ShouldQueue, ShouldBeUnique
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(/**/)
{ /* Code */ }
public function uniqueId(): int
{
return $this->user->id;
}
public function handle()
{ /* Code */ }
}
但我还是得到了同样的结果。
如果我杀死另一个队列worker,只留下一个运行,那么作业的后续示例将添加到队列中,用户的钱包将正确更新。但是应用程序将有数百个用户,我需要同时运行多个队列工作器来快速处理他们的钱包更新,因此,单个队列工作器不是一个选择。
1条答案
按热度按时间n9vozmp41#
我把我的代码编辑成这样;
现在,检查我的日志文件可以得到:
成功!检查我的数据库也验证了用户的钱包已正确更新。
队列工作者的屏幕截图显示,每个工作者不再只花10秒:
这意味着MySQL不允许另一个worker获得用户钱包行的锁,直到另一个事务完成,从而消除了并行钱包更新。
促使我采用此解决方案的资源: