Flaskでクラウド上のメッセージキュー(SQS/PubSub)を扱う完全ガイド!初心者向け解説
生徒
Flaskで作ったアプリから、時間がかかる処理を裏側でお願いする方法ってありますか?
先生
そんな時は、クラウド上のメッセージキューという仕組みを使うと便利ですよ。AWSのSQSやGoogle CloudのPubSubなどが有名です。
生徒
メッセージキュー?難しそうな名前ですね。パソコンをあまり触ったことがない私でも理解できますか?
先生
大丈夫ですよ!郵便局や行列の待ち時間をイメージすると分かりやすいです。具体的な使い方を一緒に見ていきましょう。
1. メッセージキューとは何かを身近な例で知ろう
プログラミングの世界で使われる「メッセージキュー」という言葉は、初めて聞く方にはとても難しく感じられるかもしれません。しかし、その仕組みは私たちの日常生活にとても似ています。
例えば、大人気のラーメン屋さんを想像してみてください。お客さん(Flaskアプリへのリクエスト)が一度にたくさん来ると、店員さん(サーバー)は一人では対応しきれなくなります。そこで「整理券(メッセージ)」を発行して、順番待ちの列(キュー)に並んでもらいます。店員さんは自分のペースで、列の先頭から一人ずつ料理を出していきますよね。これがメッセージキューの基本的な考え方です。
IT用語で言うと、「メッセージ」は処理してほしいデータのことで、「キュー」はそれらを溜めておく場所のことです。クラウドサービスのSQS(Amazon Simple Queue Service)やPubSub(Google Cloud Pub/Sub)は、インターネット上にある巨大な「整理券置き場」だと考えてください。これを使うことで、アプリが急に忙しくなっても、処理を漏らすことなく順番にこなしていくことができるようになります。
2. なぜFlaskアプリにキューが必要なのか
FlaskでWebアプリを作っていると、ボタンを押してから結果が出るまでに時間がかかる処理が出てくることがあります。例えば、高画質な動画の変換、大量のメール送信、AIを使った画像の分析などです。
もし、これらをFlaskの中で直接実行してしまうと、処理が終わるまでユーザーのブラウザ画面は「読み込み中」のまま固まってしまいます。ユーザーは「壊れたのかな?」と不安になり、ページを閉じてしまうかもしれません。これは非常に勿体ないことです。
ここでメッセージキューの出番です。Flaskアプリは「重たい処理をお願いします!」というメモをキューに投げ込むだけで、すぐに「承りました!」とユーザーに返事をします。ユーザーの画面はすぐに切り替わり、裏側で別のプログラムがゆっくりと重たい処理を片付けていくのです。このように、役割を分担することを「非同期処理(ひどうきしょり)」と呼び、快適なアプリを作るためには欠かせない技術となっています。
3. AWS SQSにメッセージを送るための準備
まずは、クラウド界の巨人であるAWS(アマゾン ウェブ サービス)のSQSを使って、メッセージを送る方法を学びましょう。Pythonというプログラミング言語でAWSを操作するには、「boto3(ボートスリー)」という道具を使います。
パソコンにこの道具を入れるには、コマンドプロンプトやターミナルという黒い画面で以下の命令を入力します。これが「インストール」という作業です。
pip install boto3 flask
これで準備は完了です。次に、FlaskからSQSに「処理をお願い!」というメッセージを送信するシンプルなプログラムを書いてみましょう。プログラム初心者の方は、まずはこの形を真似して書いてみるのが上達の近道です。
import boto3
from flask import Flask, request
app = Flask(__name__)
# SQSに接続するための設定(金庫の鍵のようなもの)
sqs = boto3.client('sqs', region_name='ap-northeast-1')
# クラウド上に作ったキューの場所を教えます
queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/123456789012/MyQueue'
@app.route('/send')
def send_message():
# メッセージの内容を作成します
message_body = "メール送信の依頼:user@example.com宛"
# キューにメッセージを投げ込みます
response = sqs.send_message(
QueueUrl=queue_url,
MessageBody=message_body
)
return f"メッセージを送りました!ID: {response['MessageId']}"
if __name__ == '__main__':
app.run(debug=True)
4. 送られたメッセージを裏側で受け取る仕組み
メッセージをキューに送るだけでは、まだ処理は完了していません。誰かがその「整理券」を受け取って、実際に作業をする必要があります。この作業員のようなプログラムを「ワーカー」と呼びます。
ワーカーは、Flaskアプリとは別にずっと動いていて、「新しいお願いごとは届いていないかな?」と常にキューを見守っています。これを「ポーリング(巡回)」と言います。初心者の方は、郵便受けを定期的に見に行く配達員さんをイメージすると分かりやすいでしょう。
以下は、SQSからメッセージを一つ取り出して、その内容を確認するシンプルなワーカーのコードです。Flaskとは別のファイルとして作成して実行します。
import boto3
import time
sqs = boto3.client('sqs', region_name='ap-northeast-1')
queue_url = 'https://sqs.ap-northeast-1.amazonaws.com/123456789012/MyQueue'
def worker():
print("ワーカーが起動しました。メッセージを待っています...")
while True:
# キューからメッセージを受け取ります
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=1,
WaitTimeSeconds=20 # メッセージが来るのを最大20秒待ちます
)
if 'Messages' in response:
for message in response['Messages']:
# メッセージの内容を表示します
print(f"受け取った内容: {message['Body']}")
# 処理が終わったら、キューからメッセージを消去します
# これを忘れると、何度も同じ処理をしてしまいます!
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
print("処理が完了したので削除しました。")
else:
print("メッセージはありませんでした。再度確認します。")
time.sleep(1)
if __name__ == '__main__':
worker()
5. Google Cloud Pub/Subの特徴と使い分け
AWSのSQSと並んで有名なのが、Google Cloudの「Pub/Sub(パブサブ)」です。これは「Publisher(パブリッシャー:発行者)」と「Subscriber(サブスクライバー:購読者)」の頭文字を取ったものです。
SQSとの大きな違いは、一つのメッセージを複数の人に同時に送るのが得意な点です。例えば、一つの注文が入ったときに「在庫管理システム」と「配送システム」の両方に同時に知らせたい場合に非常に役立ちます。新聞配達をイメージしてください。新聞社(発行者)が新聞を出すと、契約している多くの家庭(購読者)に同じ情報が届きますよね。これがPub/Subの仕組みです。
Flaskで使う場合も、基本的にはライブラリをインストールして、指定された場所にデータを送るという流れは同じです。クラウドの種類によって使い勝手は多少異なりますが、「重い処理を後回しにする」という目的は変わりません。どちらを使うべきか迷ったら、今使っているクラウドサービスに合わせるのが一般的です。
6. Pub/Subにメッセージを投稿するコード例
それでは、Google CloudのPub/Subを使ってメッセージを投稿する例を見てみましょう。Google Cloudの場合は「google-cloud-pubsub」というライブラリを使います。まずはインストールからです。
pip install google-cloud-pubsub
次に、Pythonでメッセージを送信するコードを書きます。AWSの時とは少し書き方が異なりますが、やっていることは「特定のトピック(掲示板のようなもの)に書き込む」という作業です。初心者の方は、特定の場所に手紙を出すイメージで読んでみてください。
from google.cloud import pubsub_v1
import os
# Google Cloudの認証用ファイルへのパス(環境変数)を設定します
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path/to/your/key.json"
project_id = "your-project-id"
topic_id = "my-topic"
# 送信用のクライアントを作成します
publisher = pubsub_v1.PublisherClient()
# 送信先のフルパスを作成します
topic_path = publisher.topic_path(project_id, topic_id)
def publish_message():
data = "新しい写真がアップロードされました"
# データはバイナリ形式(コンピュータ用のデータ形式)に変換して送ります
data_bytes = data.encode("utf-8")
# 投稿します
future = publisher.publish(topic_path, data_bytes)
print(f"メッセージを投稿しました。ID: {future.result()}")
if __name__ == '__main__':
publish_message()
7. キューを使う時の注意点:二重送信と順序
メッセージキューはとても便利ですが、使う際に気をつけなければならない「落とし穴」がいくつかあります。プログラミング未経験の方が特にはまりやすいポイントは「同じメッセージが二回届く可能性がある」ということです。
クラウドの世界では、ネットワークの都合などで稀にメッセージが重複してしまうことがあります。これを防ぐためには、「同じ処理が二回行われても大丈夫なように作る(べき等性:べきとうせい)」という考え方が重要です。例えば、「100円加算する」という処理を二回やってしまうと200円加算されてしまいますが、「残高を100円に上書きする」という処理なら、二回やっても結果は変わりませんよね。
また、基本的には「送った順番」に処理されますが、設定や状況によっては順番が前後することもあります。順番が絶対に大切な処理(例えば、会員登録をしてからログインするなど)の場合は、SQSなら「FIFO(フィフォ)キュー」という、順番を厳密に守る設定にする必要があることを覚えておきましょう。
8. エラーが起きた時の秘密兵器:デッドレターキュー
もし、ワーカーがメッセージを受け取ったものの、プログラムのミスや一時的な不具合で処理に失敗してしまったらどうなるでしょうか?何も対策をしていないと、そのメッセージはずっとキューに残り続け、何度も失敗を繰り返してしまいます。
そこで使われるのが「デッドレターキュー(DLQ)」という仕組みです。これは、何度かリトライ(再挑戦)しても処理できなかったメッセージを隔離するための「ゴミ箱兼、要確認フォルダ」のようなものです。
ここにメッセージが移動されると、エンジニアは「あ、何かトラブルがあったんだな」と気づくことができます。原因を調べてプログラムを直し、隔離されたメッセージを再度キューに戻せば、無事に処理を再開できます。このように、失敗を前提とした守りの仕組みを作っておくことが、プロのWebアプリ開発への第一歩です。
9. Flaskとキューを繋ぐことで広がる世界
メッセージキューを使いこなせるようになると、あなたが作れるアプリの幅は劇的に広がります。ただの「掲示板」や「メモ帳」だったアプリが、大量のデータを自動で処理したり、AIと連携したりする「高機能なシステム」へと進化するからです。
最初は設定やコードの書き方に戸惑うかもしれません。でも、一つ一つの単語の意味を理解し、実際に動かしてみることで、少しずつ霧が晴れるように分かってくるはずです。今回学んだ「お願いをメモに書いて列に並べる」という考え方は、どんなに複雑なシステムになっても変わりません。
Flaskは非常に自由度が高いフレームワークです。そこにクラウドの力を借りることで、個人のパソコン一台では到底できなかったような、パワフルな体験をユーザーに提供できるようになります。焦らず、自分のペースで新しい扉を開いていきましょう!