[Python/Redis]キュー管理システムの大枠を作る

Redisでgspreadキュー管理システムを組んでみるの続きです。

作るもの : 複数プロセスで実行しても落ちないgspread

  • gspreadへの読み書きを投げる専用プロセスを用意する
  • 専用プロセスでは以下の機能を入れる
    • API制限を超えないような調整弁
    • 認証のリフレッシュ
  • gspreadの使用では、そのプロセスにタスクを投げる

まずは処理の大枠を作ります。
gspreadの細かな処理の前に、Redisだけで動かします。

大枠

  • 専用プロセス
    • キューを待機する
    • キューを受け取ったら処理する
    • キューを処理する前に待ち時間を入れる
    • 前回のキューから一定時間が経過している場合、リフレッシュ処理をする
    • 特定の文字列を受け取ったらプロセスを終了する

コード

これから実装する予定のものはコメントに記載しています。

専用プロセス側

redis_spreadsheet_queuer.py

import redis
import time
conn = redis.Redis()

INTERVAL = 0.5
REFRESH_TIME = 5

print('Redis stands by...')

def is_refresh_needed(timer):
    return time.time() - timer >= REFRESH_TIME

# 権限取得/refreshタイマー開始
refresh_timer = time.time()

while True:

    # リストに追加されるまで待機
    l = conn.llen('gs_queues')
    msg = conn.blpop('gs_queues')

    print(l, " items awaiting.")

    # 値がない場合はループ抜ける
    if not msg:
        print('No value set.  TIME OUT!')
        break

    # 一定時間経過していれば権限を再度取得
    if is_refresh_needed(refresh_timer):
        print('Refresh')
        refresh_timer = time.time()

    # リストの値を取得
    val = msg[1].decode('utf-8')

    if val == 'exit':
        print('Exit!')
        break

    # spreadsheetへの読み書きを実行し、少し待つ
    print('val {} is called.'.format(val))
    time.sleep(INTERVAL)

print('Done with gspread.')

タスク投げる側

redis_spreadsheet_commander.py

import redis
import time
conn = redis.Redis()

def gs_read():
    value = 'read'
    value = value.encode('utf-8')
    conn.rpush('gs_queues', value)

def gs_write(content):
    value = 'write {}'.format(content)
    value = value.encode('utf-8')
    conn.rpush('gs_queues', value)

def gs_exit():
    value = 'exit'.encode('utf-8')
    conn.rpush('gs_queues', value)

def main():
    for i in range(3):
        gs_read()

    time.sleep(10)

    for j in range(5):
        gs_write('{} times writing!'.format(j))

    time.sleep(2)
    print('And exit')
    gs_exit()

if __name__ == '__main__':
    main()

実行結果

$ python redis_spreadsheet_queuer.py & 
[1] 13784

    Redis stands by...

$ python redis_spreadsheet_commander.py
0  items awaiting.
val read is called.
2  items awaiting.
val read is called.
1  items awaiting.
val read is called.
0  items awaiting.
Refresh
val write 0 times writing! is called.
4  items awaiting.
val write 1 times writing! is called.
3  items awaiting.
val write 2 times writing! is called.
2  items awaiting.
val write 3 times writing! is called.
And exit
2  items awaiting.
val write 4 times writing! is called.
1  items awaiting.
Exit!
Done with gspread.

[1]  + done       python redis_spreadsheet_queuer.py

Redisメソッドメモ

llen

該当キーの値の個数を取得できるます。
今回のケースだとキューの長さを取得できます。

blpop

blpop(key[,timeout])
の文法で、

  • リストにキーの値が無いか、リストの頭(左)から探し、あればリストからpop(削除して取得)する
  • ない場合、timeoutに設定された秒数待つ(=処理をblockする)
  • timeoutが0または何も設定されていない場合は、ずっと待つ

block left pop のことだと考えるとすんなり理解できました。

ちなみに、この専用プロセスは基本的に常駐させて動かす想定のため、タイムアウトは設定していません。

まとめ

本当はエラー時のリトライなども組み込みたいのですが、まずは最小構成で。
次からgspreadの実装をやっていきます。

参考