[Python/Redis]キュー管理システムの大枠を作る
Redisでgspreadキュー管理システムを組んでみるの続きです。
Contents
作るもの : 複数プロセスで実行しても落ちないgspread
- gspreadへの読み書きを投げる専用プロセスを用意する
- 専用プロセスでは以下の機能を入れる
- API制限を超えないような調整弁
- 認証のリフレッシュ
- gspreadの使用では、そのプロセスにタスクを投げる
まずは処理の大枠を作ります。
gspreadの細かな処理の前に、Redisだけで動かします。
大枠
- 専用プロセス
- キューを待機する
- キューを受け取ったら処理する
- キューを処理する前に待ち時間を入れる
- 前回のキューから一定時間が経過している場合、リフレッシュ処理をする
- 特定の文字列を受け取ったらプロセスを終了する
コード
これから実装する予定のものはコメントに記載しています。
専用プロセス側
redis_spreadsheet_queuer.py
import redis import time conn = redis.Redis() INTERVAL = 0.5 REFRESH_TIME = 5 print('Redis stands by...') def is_refresh_needed(timer): return time.time() - timer >= REFRESH_TIME # 権限取得/refreshタイマー開始 refresh_timer = time.time() while True: # リストに追加されるまで待機 l = conn.llen('gs_queues') msg = conn.blpop('gs_queues') print(l, " items awaiting.") # 値がない場合はループ抜ける if not msg: print('No value set. TIME OUT!') break # 一定時間経過していれば権限を再度取得 if is_refresh_needed(refresh_timer): print('Refresh') refresh_timer = time.time() # リストの値を取得 val = msg[1].decode('utf-8') if val == 'exit': print('Exit!') break # spreadsheetへの読み書きを実行し、少し待つ print('val {} is called.'.format(val)) time.sleep(INTERVAL) print('Done with gspread.')
タスク投げる側
redis_spreadsheet_commander.py
import redis import time conn = redis.Redis() def gs_read(): value = 'read' value = value.encode('utf-8') conn.rpush('gs_queues', value) def gs_write(content): value = 'write {}'.format(content) value = value.encode('utf-8') conn.rpush('gs_queues', value) def gs_exit(): value = 'exit'.encode('utf-8') conn.rpush('gs_queues', value) def main(): for i in range(3): gs_read() time.sleep(10) for j in range(5): gs_write('{} times writing!'.format(j)) time.sleep(2) print('And exit') gs_exit() if __name__ == '__main__': main()
実行結果
$ python redis_spreadsheet_queuer.py &
[1] 13784
Redis stands by...
$ python redis_spreadsheet_commander.py
0 items awaiting.
val read is called.
2 items awaiting.
val read is called.
1 items awaiting.
val read is called.
0 items awaiting.
Refresh
val write 0 times writing! is called.
4 items awaiting.
val write 1 times writing! is called.
3 items awaiting.
val write 2 times writing! is called.
2 items awaiting.
val write 3 times writing! is called.
And exit
2 items awaiting.
val write 4 times writing! is called.
1 items awaiting.
Exit!
Done with gspread.
[1] + done python redis_spreadsheet_queuer.py
Redisメソッドメモ
llen
該当キーの値の個数を取得できるます。
今回のケースだとキューの長さを取得できます。
blpop
blpop(key[,timeout])
の文法で、
- リストにキーの値が無いか、リストの頭(左)から探し、あればリストからpop(削除して取得)する
- ない場合、timeoutに設定された秒数待つ(=処理をblockする)
- timeoutが0または何も設定されていない場合は、ずっと待つ
block left pop
のことだと考えるとすんなり理解できました。
ちなみに、この専用プロセスは基本的に常駐させて動かす想定のため、タイムアウトは設定していません。
まとめ
本当はエラー時のリトライなども組み込みたいのですが、まずは最小構成で。
次からgspreadの実装をやっていきます。
ディスカッション
コメント一覧
まだ、コメントがありません