イントロダクション
Laravelのキューサービスは、Beanstalk、Amazon SQS、Redis、さらにはリレーショナル・データベースなど様々なキューバックエンドに対し共通のAPIを提供しています。キューによりメール送信のような時間を費やす処理を遅らせることが可能です。時間のかかるタスクを遅らせることで、よりアプリケーションのリクエストをドラマチックにスピードアップできます。
キューの設定ファイルはconfig/queue.php
です。このファイルにはフレームワークに含まれているそれぞれのドライバーへの接続設定が含まれています。それにはデータベース、Beanstalkd、Amazon SQS、Redis、ジョブが即時に実行される同期(ローカル用途)ドライバーが含まれています。
null
キュードライバはキューされたジョブが実行されないように、破棄するだけです。
接続 Vs. キュー
Laravelのキューにとりかかる前に、「接続」と「キュー」の区別を理解しておくことが重要です。config/queue.php
設定ファイルの中には、connections
設定オプションがあります。このオプションはAmazon
SQS、Beanstalk、Redisなどのバックエンドサービスへの個々の接続を定義します。しかし、どんな指定されたキュー接続も、複数の「キュー」を持つことができます。「キュー」とはキュー済みのジョブのスタック、もしくは積み重ねのことです。
queue
接続ファイルのqueue
属性を含んでいる、各接続設定例に注目してください。ジョブがディスパッチされ、指定された接続へ送られた時にのデフォルトキューです。言い換えれば、どのキューへディスパッチするのか明確に定義していないジョブをディスパッチすると、そのジョブは接続設定のqueue
属性で定義したキューへ送られます。
// このジョブはデフォルトキューへ送られる
dispatch(new Job);
// このジョブは"emails"キューへ送られる
dispatch((new Job)->onQueue('emails'));
あるアプリケーションでは複数のキューへジョブを送る必要はなく、代わりに1つのシンプルなキューが適しているでしょう。しかし、複数のキューへジョブを送ることは、優先順位づけしたい、もしくはジョブの処理を分割したいアプリケーションでは、特に便利です。Laravelのキューワーカはプライオリティによりどのキューで処理するかを指定できるからです。たとえば、ジョブをhigh
キューへ送れば、より高い処理プライオリティのワーカを実行できます。
php artisan queue:work --queue=high,default
ドライバ毎の必要要件
データベース
database
キュードライバを使用するには、ジョブを記録するためのデータベーステーブルが必要です。このテーブルを作成するマイグレーションはqueue:table
Artisanコマンドにより生成できます。マイグレーションが生成されたら、migrate
コマンドでデータベースをマイグレートしてください。
php artisan queue:table
php artisan migrate
Redis
redis
キュードライバーを使用するには、config/database.php
設定ファイルでRedisのデータベースを設定する必要があります。
Redisキュー接続でRedisクラスタを使用している場合は、キュー名にキーハッシュタグを含める必要があります。これはキューに指定した全Redisキーが同じハッシュスロットに確実に置かれるようにするためです。
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => '{default}',
'retry_after' => 90,
],
他のドライバの要件
以下の依存パッケージがリストしたキュードライバを使用するために必要です。
- Amazon SQS:
aws/aws-sdk-php ~3.0
- Beanstalkd:
pda/pheanstalk ~3.0
- Redis:
predis/predis ~1.0
ジョブの作成
ジョブクラスの生成
キュー投入可能なアプリケーションの全ジョブは、デフォルトでapp/Jobs
ディレクトリへ保存されます。app/Jobs
ディレクトリが存在しなくても、make:job
Artisanコマンドの実行時に生成されます。新しいキュージョブをArtisan
CLIで生成できます。
php artisan make:job SendReminderEmail
非同期で実行するため、ジョブをキューへ投入することをLaravelへ知らせる、Illuminate\Contracts\Queue\ShouldQueue
インターフェイスが生成されたクラスには実装されます。
クラス構成
ジョブクラスは通常とてもシンプルで、キューによりジョブが処理される時に呼び出される、handle
メソッドのみで構成されています。手始めに、ジョブクラスのサンプルを見てみましょう。この例は、ポッドキャストの公開サービスを管理し、公開前にアップロードしたポッドキャストファイルを処理する必要があるという仮定です。
<?php
namespace App\Jobs;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
class ProcessPodcast implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 新しいジョブインスタンスの生成
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* ジョブの実行
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// アップロード済みポッドキャストの処理…
}
}
この例中、キュージョブのコンテナーに直接Eloquentモデルが渡せることに注目してください。ジョブが使用しているSerializesModels
トレイトによりEloquentモデルは優雅にシリアライズされ、ジョブが処理される時にアンシリアライズされます。キュー投入されたジョブがコンテナでEloquentモデルを受け取ると、モデルの識別子のみシリアライズされています。ジョブが実際に処理される時、キューシステムは自動的にデータベースから完全なモデルインスタンスを再取得します。これらは全てアプリケーションの完全な透過性のためであり、Eloquentモデルインスタンスをシリアライズするときに発生する問題を防ぐことができます。
handle
メソッドはキューによりジョブが処理されるときに呼びだされます。ジョブのhandle
メソッドにタイプヒントにより依存を指定できることに注目してください。Laravelのサービスコンテナが自動的に依存を注入します。
Note: Rawイメージコンテンツのようなバイナリデータは、キュージョブへ渡す前に、
base64_encode
関数を通してください。そうしないと、そのジョブはキューへ設置する前にJSONへ正しくシリアライズされません。
ジョブのディスパッチ
ジョブクラスが書き上がったら、dispatch
ヘルパによりディスパッチできます。dispatch
ヘルパは引数がひとつだけ必要で、ジョブインスタンスです。
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 新ポッドキャストの保存
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// ポッドキャスト作成…
dispatch(new ProcessPodcast($podcast));
}
}
Tip!!
dispatch
ヘルパは短くてグローバルに使用できる関数の利便性を提供しており、同時にテストもとても簡単にできます。Laravelのテストに関するドキュメントで詳細を学んでください。
遅延ディスパッチ
キューされたジョブの実行を遅らせたい場合は、ジョブインスタンスに対しdelay
メソッドを使用してください。delay
メソッドは、生成したジョブクラスにデフォルトで含まれている、Illuminate\Bus\Queueable
トレイトにより提供されています。例として、ディスパッチされた10分後まで、ジョブの処理を遅らせる指定をしてみましょう。
<?php
namespace App\Http\Controllers;
use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 新ポッドキャストの保存
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// ポッドキャスト作成…
$job = (new ProcessPodcast($podcast))
->delay(Carbon::now()->addMinutes(10));
dispatch($job);
}
}
Note: Amazon SQSキューサービスは、最大15分の遅延時間です。
キューと接続のカスタマイズ
特定キューへのディスパッチ
別のキューへジョブを投入することで、キュージョブを「カテゴライズ」でき、数々のキューにいくつのワーカを割り付けるかでプライオリティ付もできます。これはキュー設定ファイルで定義した、別のキュー「接続」へジョブを投入することではなく、一つの接続の指定したキューへ投入することです。キューを指定するには、ジョブインスタンスのonQueue
メソッドを使ってください。
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 新ポッドキャストの保存
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// ポッドキャスト作成…
$job = (new ProcessPodcast($podcast))->onQueue('processing');
dispatch($job);
}
}
特定の接続へのディスパッチ
複数キューの接続を取り扱う場合、どの接続へジョブを投入するか指定できます。接続を指定するには、ジョブインスタンスのonConnection
メソッドを利用します。
<?php
namespace App\Http\Controllers;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;
class PodcastController extends Controller
{
/**
* 新ポッドキャストの保存
*
* @param Request $request
* @return Response
*/
public function store(Request $request)
{
// ポッドキャスト作成…
$job = (new ProcessPodcast($podcast))->onConnection('sqs');
dispatch($job);
}
}
もちろん、ジョブを投入する接続とキューを指定するために、onConnection
とonQueue
メソッドをチェーンすることもできます。
$job = (new ProcessPodcast($podcast))
->onConnection('sqs')
->onQueue('processing');
最大試行回数/タイムアウト値の指定
最大試行回数
ジョブが試行する最大回数を指定するアプローチの一つは、Artisanコマンドラインへ--tries
スイッチ使う方法です。
php artisan queue:work --tries=3
しかし、より粒度の高いアプローチは、ジョブクラス自身に最大試行回数を定義する方法です。これはコマンドラインで指定された値より、優先度が高くなっています。
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* 最大試行回数
*
* @var int
*/
public $tries = 5;
}
タイムアウト
Note: The
timeout
feature is optimized for PHP 7.1+ and thepcntl
PHP extension.
同様に、ジョブの最大実行秒数を指定するために、Artisanコマンドラインに--timeout
スイッチを指定することができます。
php artisan queue:work --timeout=30
しかしながら、最大実行秒数をジョブクラス自身に定義することもできます。ジョブにタイムアウト時間を指定すると、コマンドラインに指定されたタイムアウトよりも優先されます。
<?php
namespace App\Jobs;
class ProcessPodcast implements ShouldQueue
{
/**
* ジョブがタイムアウトになるまでの秒数
*
* @var int
*/
public $timeout = 120;
}
エラー処理
ジョブの処理中に例外が投げられると、ジョブは自動的にキューへ戻され、再試行されます。ジョブはアプリケーションが許している最大試行回数に達するまで、連続して実行されます。最大試行回数はqueue:work
Artisanコマンドへ--tries
スイッチを使い定義されます。もしくは、ジョブクラス自身に最大試行回数を定義することもできます。キューワーカの実行についての情報は、以降で説明します。
キューワーカの実行
Laravelには、キューに投入された新しいジョブを処理する、キューワーカも含まれています。queue:work
Artisanコマンドを使いワーカを実行できます。queue:work
コマンドが起動したら、皆さんが停止するか、ターミナルを閉じるまで実行指示付けることに注意してください。
php artisan queue:work
Tip!! バックグランドで
queue:work
プロセスを永続的に実行し続けるには、キューワーカが止まらずに実行し続けていることを確実にするため、Supervisorのようなプロセスモニタを利用する必要があります。
キューワーカは長時間起動するプロセスで、起動した状態のままメモリに保存されることを覚えておいてください。その結果、一度起動したら、コードベースの変更は反映されません。そのため、開発期間中はキューワーカを再起動することを忘れないでください。
ジョブを一つ処理する
--once
オプションは、ワーカにキュー中のジョブをひとつだけ処理するように指示します。
php artisan queue:work --once
接続とキューの指定
どのキュー接続をワーカが使用するのかを指定できます。work
コマンドで指定する接続名は、config/queue.php
設定ファイルで定義されている接続と対応します。
php artisan queue:work redis
指定した接続の特定のキューだけを処理するように、さらにキューワーカをカスタマイズすることもできます。たとえば、メールの処理をすべて、redis
キュー接続のemails
キューで処理する場合、以下のコマンドで単一のキューの処理だけを行うワーカを起動できます。
php artisan queue:work redis --queue=emails
リソースの考察
デーモンキューワーカは各ジョブを処理する前に、フレームワークを「再起動」しません。そのため、各ジョブが終了したら、大きなリソースを開放してください。たとえば、GDライブラリでイメージ処理を行ったら、終了前にimagedestroy
により、メモリを開放してください。
キュープライオリティ
時々、キューをどのように処理するかをプライオリティ付けしたいことも起きます。たとえば、config/queue.php
でredis
接続のデフォルトqueue
をlow
に設定したとしましょう。しかし、あるジョブをhigh
プライオリティでキューへ投入したい場合です。
dispatch((new Job)->onQueue('high'));
low
キュー上のジョブの処理が継続される前に、全high
キュージョブが処理されることを確実にするには、work
コマンドのキュー名にコンマ区切りのリストで指定してください。
php artisan queue:work --queue=high,low
キューワーカとデプロイ
キューワーカは長時間起動プロセスであるため、リスタートしない限りコードの変更を反映しません。ですから、キューワーカを使用しているアプリケーションをデプロイする一番シンプルな方法は、デプロイ処理の間、ワーカをリスタートすることです。queue:restart
コマンドを実行することで、全ワーカを穏やかに再起動できます。
php artisan queue:restart
このコマンドは存在しているジョブが失われないように、現在のジョブの処理が終了した後に、全キューワーカーへ穏やかに「終了する(die)」よう指示します。キューワーカはqueue:restart
コマンドが実行されると、終了するわけですから、キュージョブを自動的に再起動する、Supervisorのようなプロセスマネージャーを実行すべきでしょう。
Tip!! このコマンドはリスタートシグナルを保存するために、キャッシュを使用します。そのため、この機能を使用する前に、アプリケーションのキャッシュドライバーが、正しく設定されていることを確認してください。
ジョブの期限切れとタイムアウト
ジョブの有効期限
config/queue.php
設定ファイルの中で、各キュ接続はretry_after
オプションを定義しています。このオプションはジョブの処理を再試行するまで、キュー接続を何秒待つかを指定します。たとえば、retry_after
の値が90
であれば、そのジョブは処理が終わってから90秒の間に削除されなければ、キューへ再投入されます。通常、retry_after
値はジョブが処理を妥当に完了するまでの秒数の最大値を指定します。
Note:
retry_after
を含まない唯一の接続は、Amazon SQSです。SQSはAWSコンソールで管理する、Default Visibility Timeoutを元にリトライを行います。
ワーカタイムアウト
queue:work
Artisanコマンドは--timeout
オプションも提供しています。--timeout
オプションはLaravelキューマスタプロセスが、ジョブを処理する子のキューワーカをKillするまでどのくらい待つかを指定します。外部のHTTP呼び出しの反応が無いなど様々な理由で、時より子のキュープロセスは「フリーズ」します。--timeout
オプションは指定した実行時間を過ぎた、フリーズプロセスを取り除きます。
php artisan queue:work --timeout=60
retry_after
設定オプションと--timeout
CLIオプションは異なります。しかし、確実にジョブを失わずに、一度だけ処理を完了できるよう共に働きます。
Note:
--timeout
値は、最低でも数秒retry_after
設定値よりも短くしてください。これにより、与えられたジョブを処理するワーカが、ジョブのリトライ前に確実にkillされます。--timeout
オプションをretry_after
設定値よりも長くすると、ジョブが2度実行されるでしょう。
ワーカスリープ時間
ジョブがキュー上に存在しているとき、ワーカは各ジョブ間にディレイを取らずに実行し続けます。sleep
オプションは新しく処理するジョブが存在しない時に、どの程度「スリープ」するかを決めます。スリープ中、ワーカは新しいジョブを処理しません。ジョブはワーカが目を様した後に処理されます。
php artisan queue:work --sleep=3
Supervisor設定
Supervisorのインストール
SupervisorはLinuxオペレーティングシステムのプロセスモニタで、queue:work
プロセスが落ちると自動的に起動します。UbuntuにSupervisorをインストールするには、次のコマンドを使ってください。
sudo apt-get install supervisor
Tip!! Supervisoの設定に圧倒されそうならば、Laravelプロジェクトのために、Supervisorを自動的にインストールし、設定するLaravel Forgeの使用を考慮してください。
Supervisorの設定
Supervisorの設定ファイルは、通常/etc/supervisor/conf.d
ディレクトリに保存します。このディレクトリの中には、Supervisorにどのようにプロセスを監視するのか指示する設定ファイルを好きなだけ設置できます。たとえば、laravel-worker.conf
ファイルを作成し、queue:work
プロセスを起動、監視させてみましょう。
[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
この例のnumprocs
ディレクティブは、Supervisorに全部で8つのqueue:workプロセスを実行・監視し、落ちている時は自動的に再起動するように指示しています。もちろんcommand
ディレクティブのqueue:work sqs
の部分を変更し、希望のキュー接続に合わせてください。
Supervisorの起動
設定ファイルができたら、Supervisorの設定を更新し起動するために以下のコマンドを実行してください。
sudo supervisorctl reread
sudo supervisorctl update
sudo supervisorctl start laravel-worker:*
Supervisorの詳細情報は、Supervisorドキュメントで確認してください。
失敗したジョブの処理
時より、キューされたジョブは失敗します。心配ありません。物事は計画通りに進まないものです。Laravelではジョブを再試行する最大回数を指定できます。この回数試行すると、そのジョブはfailed_jobs
データベーステーブルに挿入されます。failed_jobs
テーブルのマイグレーションを生成するにはqueue:failed-table
コマンドを実行して下さい。
php artisan queue:failed-table
php artisan migrate
次にキューワーカの実行時、queue:work
コマンドに--tries
スイッチを付け、最大試行回数を指定します。--tries
オプションに値を指定しないと、ジョブは無限に試行します。
php artisan queue:work redis --tries=3
ジョブ失敗後のクリーンアップ
失敗時にジョブ特定のクリーンアップを実行するため、ジョブクラスでfailed
メソッドを直接定義できます。これはユーザーに警告を送ったり、ジョブの実行アクションを巻き戻すために最適な場所です。failed
メソッドには、そのジョブを落とすことになった例外(Exception
)が渡されます。
<?php
namespace App\Jobs;
use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
class ProcessPodcast implements ShouldQueue
{
use InteractsWithQueue, Queueable, SerializesModels;
protected $podcast;
/**
* 新しいジョブインスタンスの生成
*
* @param Podcast $podcast
* @return void
*/
public function __construct(Podcast $podcast)
{
$this->podcast = $podcast;
}
/**
* ジョブの実行
*
* @param AudioProcessor $processor
* @return void
*/
public function handle(AudioProcessor $processor)
{
// アップロード済みポッドキャストの処理…
}
/**
* 失敗したジョブの処理
*
* @param Exception $exception
* @return void
*/
public function failed(Exception $exception)
{
// 失敗の通知をユーザーへ送るなど…
}
}
ジョブ失敗イベント
ジョブが失敗した時に呼び出されるイベントを登録したい場合、Queue::failing
メソッドが使えます。このイベントはメールやHipChatにより、チームへ通知する良い機会になります。例として、Laravelに含まれているAppServiceProvider
で、このイベントのコールバックを付け加えてみましょう。
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;
class AppServiceProvider extends ServiceProvider
{
/**
* 全アプリケーションサービスの初期処理
*
* @return void
*/
public function boot()
{
Queue::failing(function (JobFailed $event) {
// $event->connectionName
// $event->job
// $event->exception
});
}
/**
* サービスプロバイダの登録
*
* @return void
*/
public function register()
{
//
}
}
失敗したジョブの再試行
failed_jobs
データベーステーブルに挿入された、失敗したジョブを全部確認したい場合はqueue:failed
Arisanコマンドを利用します。
php artisan queue:failed
queue:failed
コマンドはジョブID、接続、キュー、失敗した時間をリスト表示します。失敗したジョブをジョブIDで指定することでリトライできます。たとえば、IDが5
の失敗したジョブを再試行するため、以下のコマンドを実行します。
php artisan queue:retry 5
失敗したジョブをすべて再試行するには、IDとしてall
をqueue:retry
コマンドへ指定し、実行してください。
php artisan queue:retry all
失敗したジョブを削除する場合は、queue:forget
コマンドを使います。
php artisan queue:forget 5
失敗したジョブを全部削除するには、queue:flush
コマンドを使います。
php artisan queue:flush
ジョブイベント
Queue
ファサードにbefore
とafter
メソッドを使い、キューされたジョブの実行前後に実行する、コールバックを指定できます。これらのコールバックはログを追加したり、ダッシュボードの状態を増加させたりするための機会を与えます。通常、これらのメソッドはサービスプロバイダから呼び出します。たとえば、Laravelに含まれるAppServiceProvider
を使っていましょう。
<?php
namespace App\Providers;
use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
class AppServiceProvider extends ServiceProvider
{
/**
* 全アプリケーションサービスの初期処理
*
* @return void
*/
public function boot()
{
Queue::before(function (JobProcessing $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
Queue::after(function (JobProcessed $event) {
// $event->connectionName
// $event->job
// $event->job->payload()
});
}
/**
* サービスプロバイダの登録
*
* @return void
*/
public function register()
{
//
}
}
Queue
ファサードのlooping
メソッドを使用し、ワーカがキューからジョブをフェッチする前に、指定したコールバックを実行できます。たとえば、直前の失敗したジョブの未処理のままのトランザクションをロールバックするクロージャを登録できます。
Queue::looping(function () {
while (DB::transactionLevel() > 0) {
DB::rollBack();
}
});