イントロダクション

Laravelのキューサービスは、Beanstalk、Amazon SQS、Redis、さらにはリレーショナル・データベースなど様々なキューバックエンドに対し共通のAPIを提供しています。キューによりメール送信のような時間を費やす処理を遅らせることが可能です。時間のかかるタスクを遅らせることで、よりアプリケーションのリクエストをドラマチックにスピードアップできます。

キューの設定ファイルはconfig/queue.phpです。このファイルにはフレームワークに含まれているそれぞれのドライバーへの接続設定が含まれています。それにはデータベース、BeanstalkdAmazon SQSRedis、ジョブが即時に実行される同期(ローカル用途)ドライバーが含まれています。 nullキュードライバはキューされたジョブが実行されないように、破棄するだけです。

接続 Vs. キュー

Laravelのキューにとりかかる前に、「接続」と「キュー」の区別を理解しておくことが重要です。config/queue.php設定ファイルの中には、connections設定オプションがあります。このオプションはAmazon SQS、Beanstalk、Redisなどのバックエンドサービスへの個々の接続を定義します。しかし、どんな指定されたキュー接続も、複数の「キュー」を持つことができます。「キュー」とはキュー済みのジョブのスタック、もしくは積み重ねのことです。

queue接続ファイルのqueue属性を含んでいる、各接続設定例に注目してください。ジョブがディスパッチされ、指定された接続へ送られた時にのデフォルトキューです。言い換えれば、どのキューへディスパッチするのか明確に定義していないジョブをディスパッチすると、そのジョブは接続設定のqueue属性で定義したキューへ送られます。

// このジョブはデフォルトキューへ送られる
dispatch(new Job);

// このジョブは"emails"キューへ送られる
dispatch((new Job)->onQueue('emails'));

あるアプリケーションでは複数のキューへジョブを送る必要はなく、代わりに1つのシンプルなキューが適しているでしょう。しかし、複数のキューへジョブを送ることは、優先順位づけしたい、もしくはジョブの処理を分割したいアプリケーションでは、特に便利です。Laravelのキューワーカはプライオリティによりどのキューで処理するかを指定できるからです。たとえば、ジョブをhighキューへ送れば、より高い処理プライオリティのワーカを実行できます。

php artisan queue:work --queue=high,default

ドライバ毎の必要要件

データベース

databaseキュードライバを使用するには、ジョブを記録するためのデータベーステーブルが必要です。このテーブルを作成するマイグレーションはqueue:table Artisanコマンドにより生成できます。マイグレーションが生成されたら、migrateコマンドでデータベースをマイグレートしてください。

php artisan queue:table

php artisan migrate

Redis

redisキュードライバーを使用するには、config/database.php設定ファイルでRedisのデータベースを設定する必要があります。

Redisキュー接続でRedisクラスタを使用している場合は、キュー名にキーハッシュタグを含める必要があります。これはキューに指定した全Redisキーが同じハッシュスロットに確実に置かれるようにするためです。

'redis' => [
    'driver' => 'redis',
    'connection' => 'default',
    'queue' => '{default}',
    'retry_after' => 90,
],

他のドライバの要件

以下の依存パッケージがリストしたキュードライバを使用するために必要です。

Amazon SQS: aws/aws-sdk-php ~3.0 Beanstalkd: pda/pheanstalk ~3.0 Redis: predis/predis ~1.0

ジョブの作成

ジョブクラスの生成

キュー投入可能なアプリケーションの全ジョブは、デフォルトでapp/Jobsディレクトリへ保存されます。app/Jobsディレクトリが存在しなくても、make:job Artisanコマンドの実行時に生成されます。新しいキュージョブをArtisan CLIで生成できます。

php artisan make:job SendReminderEmail

非同期で実行するため、ジョブをキューへ投入することをLaravelへ知らせる、Illuminate\Contracts\Queue\ShouldQueueインターフェイスが生成されたクラスには実装されます。

クラス構成

ジョブクラスは通常とてもシンプルで、キューによりジョブが処理される時に呼び出される、handleメソッドのみで構成されています。手始めに、ジョブクラスのサンプルを見てみましょう。この例は、ポッドキャストの公開サービスを管理し、公開前にアップロードしたポッドキャストファイルを処理する必要があるという仮定です。

<?php

namespace App\Jobs;

use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 新しいジョブインスタンスの生成
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * ジョブの実行
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // アップロード済みポッドキャストの処理…
    }
}

この例中、キュージョブのコンテナーに直接Eloquentモデルが渡せることに注目してください。ジョブが使用しているSerializesModelsトレイトによりEloquentモデルは優雅にシリアライズされ、ジョブが処理される時にアンシリアライズされます。キュー投入されたジョブがコンテナでEloquentモデルを受け取ると、モデルの識別子のみシリアライズされています。ジョブが実際に処理される時、キューシステムは自動的にデータベースから完全なモデルインスタンスを再取得します。これらは全てアプリケーションの完全な透過性のためであり、Eloquentモデルインスタンスをシリアライズするときに発生する問題を防ぐことができます。

handleメソッドはキューによりジョブが処理されるときに呼びだされます。ジョブのhandleメソッドにタイプヒントにより依存を指定できることに注目してください。Laravelのサービスコンテナが自動的に依存を注入します。

Note: Rawイメージコンテンツのようなバイナリデータは、キュージョブへ渡す前に、base64_encode関数を通してください。そうしないと、そのジョブはキューへ設置する前にJSONへ正しくシリアライズされません。

ジョブのディスパッチ

ジョブクラスが書き上がったら、dispatchヘルパによりディスパッチできます。dispatchヘルパは引数がひとつだけ必要で、ジョブインスタンスです。

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 新ポッドキャストの保存
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // ポッドキャスト作成…

        dispatch(new ProcessPodcast($podcast));
    }
}

Tip!! dispatchヘルパは短くてグローバルに使用できる関数の利便性を提供しており、同時にテストもとても簡単にできます。Laravelのテストに関するドキュメントで詳細を学んでください。

遅延ディスパッチ

キューされたジョブの実行を遅らせたい場合は、ジョブインスタンスに対しdelayメソッドを使用してください。delayメソッドは、生成したジョブクラスにデフォルトで含まれている、Illuminate\Bus\Queueableトレイトにより提供されています。例として、ディスパッチされた10分後まで、ジョブの処理を遅らせる指定をしてみましょう。

<?php

namespace App\Http\Controllers;

use Carbon\Carbon;
use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 新しいポッドキャストの保存
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // ポッドキャストの作成…

        $job = (new ProcessPodcast($podcast))
                    ->delay(Carbon::now()->addMinutes(10));

        dispatch($job);
    }
}

Note: Amazon SQSキューサービスは、最大15分の遅延時間です。

キューと接続のカスタマイズ

特定キューへのディスパッチ

別のキューへジョブを投入することで、キュージョブを「カテゴライズ」でき、数々のキューにいくつのワーカを割り付けるかでプライオリティ付もできます。これはキュー設定ファイルで定義した、別のキュー「接続」へジョブを投入することではなく、一つの接続の指定したキューへ投入することです。キューを指定するには、ジョブインスタンスのonQueueメソッドを使ってください。

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 新しいポッドキャストの保存
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // ポッドキャストの作成…

        $job = (new ProcessPodcast($podcast))->onQueue('processing');

        dispatch($job);
    }
}

特定の接続へのディスパッチ

複数キューの接続を取り扱う場合、どの接続へジョブを投入するか指定できます。接続を指定するには、ジョブインスタンスのonConnectionメソッドを利用します。

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use Illuminate\Http\Request;
use App\Http\Controllers\Controller;

class PodcastController extends Controller
{
    /**
     * 新しいポッドキャストの保存
     *
     * @param  Request  $request
     * @return Response
     */
    public function store(Request $request)
    {
        // ポッドキャストの作成…

        $job = (new ProcessPodcast($podcast))->onConnection('sqs');

        dispatch($job);
    }
}

もちろん、ジョブを投入する接続とキューを指定するために、onConnectiononQueueメソッドをチェーンすることもできます。

$job = (new ProcessPodcast($podcast))
                ->onConnection('sqs')
                ->onQueue('processing');

エラー処理

ジョブの処理中に例外が投げられると、そのジョブは再実行のために自動的にキューへ戻されます。そのジョブはアプリケーションで定義されている最大試行回数に達するまで、持続的に実行されます。最大試行回数はqueue:work Artisanコマンドの--triesスイッチにより定義されます。キューワーカの実行についての詳細情報は、この後に説明します

キューワーカの実行

Laravelには、キューに投入された新しいジョブを処理する、キューワーカも含まれています。queue:work Artisanコマンドを使いワーカを実行できます。queue:workコマンドが起動したら、皆さんが停止するか、ターミナルを閉じるまで実行指示付けることに注意してください。

php artisan queue:work

Tip!! バックグランドでqueue:workプロセスを永続的に実行し続けるには、キューワーカが止まらずに実行し続けていることを確実にするため、Supervisorのようなプロセスモニタを利用する必要があります。

キューワーカは長時間起動するプロセスで、起動した状態のままメモリに保存されることを覚えておいてください。その結果、一度起動したら、コードベースの変更は反映されません。そのため、開発期間中はキューワーカを再起動することを忘れないでください。

接続とキューの指定

どのキュー接続をワーカが使用するのかを指定できます。workコマンドで指定する接続名は、config/queue.php設定ファイルで定義されている接続と対応します。

php artisan queue:work redis

指定した接続の特定のキューだけを処理するように、さらにキューワーカをカスタマイズすることもできます。たとえば、メールの処理をすべて、redisキュー接続のemailsキューで処理する場合、以下のコマンドで単一のキューの処理だけを行うワーカを起動できます。

php artisan queue:work redis --queue=emails

リソースの考察

デーモンキューワーカは各ジョブを処理する前に、フレームワークを「再起動」しません。そのため、各ジョブが終了したら、大きなリソースを開放してください。たとえば、GDライブラリでイメージ処理を行ったら、終了前にimagedestroyにより、メモリを開放してください。

キュープライオリティ

時々、キューをどのように処理するかをプライオリティ付けしたいことも起きます。たとえば、config/queue.phpredis接続のデフォルトqueuelowに設定したとしましょう。しかし、あるジョブをhighプライオリティでキューへ投入したい場合です。

dispatch((new Job)->onQueue('high'));

lowキュー上のジョブの処理が継続される前に、全highキュージョブが処理されることを確実にするには、workコマンドのキュー名にコンマ区切りのリストで指定してください。

php artisan queue:work --queue=high,low

キューワーカとデプロイ

キューワーカは長時間起動プロセスであるため、リスタートしない限りコードの変更を反映しません。ですから、キューワーカを使用しているアプリケーションをデプロイする一番シンプルな方法は、デプロイ処理の間、ワーカをリスタートすることです。queue:restartコマンドを実行することで、全ワーカを穏やかに再起動できます。

php artisan queue:restart

このコマンドは存在しているジョブが失われないように、現在のジョブの処理が終了した後に、全キューワーカーへ穏やかに「終了する(die)」よう指示します。キューワーカはqueue:restartコマンドが実行されると、終了するわけですから、キュージョブを自動的に再起動する、Supervisorのようなプロセスマネージャーを実行すべきでしょう。

ジョブの期限切れとタイムアウト

ジョブの有効期限

config/queue.php設定ファイルの中で、各キュ接続はretry_afterオプションを定義しています。このオプションはジョブの処理を再試行するまで、キュー接続を何秒待つかを指定します。たとえば、retry_afterの値が90であれば、そのジョブは処理が終わってから90秒の間に削除されなければ、キューへ再投入されます。通常、retry_after値はジョブが処理を妥当に完了するまでの秒数の最大値を指定します。

Note: retry_afterを含まない唯一の接続は、Amazon SQSです。SQSはAWSコンソールで管理する、Default Visibility Timeoutを元にリトライを行います。

ワーカタイムアウト

queue:work Artisanコマンドは--timeoutオプションも提供しています。--timeoutオプションはLaravelキューマスタプロセスが、ジョブを処理する子のキューワーカをKillするまでどのくらい待つかを指定します。外部のHTTP呼び出しの反応が無いなど様々な理由で、時より子のキュープロセスは「フリーズ」します。--timeoutオプションは指定した実行時間を過ぎた、フリーズプロセスを取り除きます。

php artisan queue:work --timeout=60

retry_after設定オプションと--timeout CLIオプションは異なります。しかし、確実にジョブを失わずに、一度だけ処理を完了できるよう共に働きます。

Note: --timeout値は、最低でも数秒retry_after設定値よりも短くしてください。これにより、与えられたジョブを処理するワーカが、ジョブのリトライ前に確実にkillされます。--timeoutオプションをretry_after設定値よりも長くすると、ジョブが2度実行されるでしょう。

ワーカスリープ時間

ジョブがキュー上に存在しているとき、ワーカは各ジョブ間にディレイを取らずに実行し続けます。sleepオプションは新しく処理するジョブが存在しない時に、どの程度「スリープ」するかを決めます。

php artisan queue:work --sleep=3

Supervisor設定

Supervisorのインストール

SupervisorはLinuxオペレーティングシステムのプロセスモニタで、queue:workプロセスが落ちると自動的に起動します。UbuntuにSupervisorをインストールするには、次のコマンドを使ってください。

sudo apt-get install supervisor

Tip!! Supervisoの設定に圧倒されそうならば、Laravelプロジェクトのために、Supervisorを自動的にインストールし、設定するLaravel Forgeの使用を考慮してください。

Supervisorの設定

Supervisorの設定ファイルは、通常/etc/supervisor/conf.dディレクトリに保存します。このディレクトリの中には、Supervisorにどのようにプロセスを監視するのか指示する設定ファイルを好きなだけ設置できます。たとえば、laravel-worker.confファイルを作成し、queue:workプロセスを起動、監視させてみましょう。

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3
autostart=true
autorestart=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log

この例のnumprocsディレクティブは、Supervisorに全部で8つのqueue:workプロセスを実行・監視し、落ちている時は自動的に再起動するように指示しています。もちろんcommandディレクティブのqueue:work sqsの部分を変更し、希望のキュー接続に合わせてください。

Supervisorの起動

設定ファイルができたら、Supervisorの設定を更新し起動するために以下のコマンドを実行してください。

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start laravel-worker:*

Supervisorの詳細情報は、Supervisorドキュメントで確認してください。

失敗したジョブの処理

時より、キューされたジョブは失敗します。心配ありません。物事は計画通りに進まないものです。Laravelではジョブを再試行する最大回数を指定できます。この回数試行すると、そのジョブはfailed_jobsデータベーステーブルに挿入されます。failed_jobsテーブルのマイグレーションを生成するにはqueue:failed-tableコマンドを実行して下さい。

php artisan queue:failed-table

php artisan migrate

次にキューワーカの実行時、queue:workコマンドに--triesスイッチを付け、最大試行回数を指定します。--triesオプションに値を指定しないと、ジョブは無限に試行します。

php artisan queue:work redis --tries=3

ジョブ失敗後のクリーンアップ

失敗時にジョブ特定のクリーンアップを実行するため、ジョブクラスでfailedメソッドを直接定義できます。これはユーザに警告を送ったり、ジョブの実行アクションを巻き戻すために最適な場所です。failedメソッドには、そのジョブを落とすことになった例外(Exception)が渡されます。

<?php

namespace App\Jobs;

use Exception;
use App\Podcast;
use App\AudioProcessor;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;

class ProcessPodcast implements ShouldQueue
{
    use InteractsWithQueue, Queueable, SerializesModels;

    protected $podcast;

    /**
     * 新しいジョブインスタンスの生成
     *
     * @param  Podcast  $podcast
     * @return void
     */
    public function __construct(Podcast $podcast)
    {
        $this->podcast = $podcast;
    }

    /**
     * ジョブの実行
     *
     * @param  AudioProcessor  $processor
     * @return void
     */
    public function handle(AudioProcessor $processor)
    {
        // アップロード済みポッドキャストの処理…
    }

    /**
     * 失敗したジョブの処理
     *
     * @param  Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // 失敗の通知をユーザへ送るなど…
    }
}

ジョブ失敗イベント

ジョブが失敗した時に呼び出されるイベントを登録したい場合、Queue::failingメソッドが使えます。このイベントはメールやHipChatにより、チームへ通知する良い機会になります。例として、Laravelに含まれているAppServiceProviderで、このイベントのコールバックを付け加えてみましょう。

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Support\ServiceProvider;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 全アプリケーションサービスの初期処理
     *
     * @return void
     */
    public function boot()
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }

    /**
     * サービスプロバイダの登録
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

失敗したジョブの再試行

failed_jobsデータベーステーブルに挿入された、失敗したジョブを全部確認したい場合はqueue:failed Arisanコマンドを利用します。

php artisan queue:failed

queue:failedコマンドはジョブID、接続、キュー、失敗した時間をリスト表示します。失敗したジョブをジョブIDで指定することでリトライできます。たとえば、IDが5の失敗したジョブを再試行するため、以下のコマンドを実行します。

php artisan queue:retry 5

失敗したジョブをすべて再試行するには、IDとしてallqueue:retryコマンドへ指定し、実行してください。

php artisan queue:retry all

失敗したジョブを削除する場合は、queue:forgetコマンドを使います。

php artisan queue:forget 5

失敗したジョブを全部削除するには、queue:flushコマンドを使います。

php artisan queue:flush

ジョブイベント

Queueファサードbeforeafterメソッドを使い、キューされたジョブの実行前後に実行する、コールバックを指定できます。これらのコールバックはログを追加したり、ダッシュボードの状態を増加させたりするための機会を与えます。通常、これらのメソッドはサービスプロバイダから呼び出します。たとえば、Laravelに含まれるAppServiceProviderを使っていましょう。

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * 全アプリケーションサービスの初期処理
     *
     * @return void
     */
    public function boot()
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }

    /**
     * サービスプロバイダの登録
     *
     * @return void
     */
    public function register()
    {
        //
    }
}

Queue ファサードloopingメソッドを使用し、ワーカがキューからジョブをフェッチする前に、指定したコールバックを実行できます。たとえば、直前の失敗したジョブの未処理のままのトランザクションをロールバックするクロージャを登録できます。

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});