[Python/Redis]キュー管理システムの大枠を作る
Redisでgspreadキュー管理システムを組んでみるの続きです。
Contents
作るもの : 複数プロセスで実行しても落ちないgspread
- gspreadへの読み書きを投げる専用プロセスを用意する
- 専用プロセスでは以下の機能を入れる
- API制限を超えないような調整弁
- 認証のリフレッシュ
- gspreadの使用では、そのプロセスにタスクを投げる
まずは処理の大枠を作ります。
gspreadの細かな処理の前に、Redisだけで動かします。
大枠
- 専用プロセス
- キューを待機する
- キューを受け取ったら処理する
- キューを処理する前に待ち時間を入れる
- 前回のキューから一定時間が経過している場合、リフレッシュ処理をする
- 特定の文字列を受け取ったらプロセスを終了する
コード
これから実装する予定のものはコメントに記載しています。
専用プロセス側
redis_spreadsheet_queuer.py
import redis
import time
conn = redis.Redis()
INTERVAL = 0.5
REFRESH_TIME = 5
print('Redis stands by...')
def is_refresh_needed(timer):
return time.time() - timer >= REFRESH_TIME
# 権限取得/refreshタイマー開始
refresh_timer = time.time()
while True:
# リストに追加されるまで待機
l = conn.llen('gs_queues')
msg = conn.blpop('gs_queues')
print(l, " items awaiting.")
# 値がない場合はループ抜ける
if not msg:
print('No value set. TIME OUT!')
break
# 一定時間経過していれば権限を再度取得
if is_refresh_needed(refresh_timer):
print('Refresh')
refresh_timer = time.time()
# リストの値を取得
val = msg[1].decode('utf-8')
if val == 'exit':
print('Exit!')
break
# spreadsheetへの読み書きを実行し、少し待つ
print('val {} is called.'.format(val))
time.sleep(INTERVAL)
print('Done with gspread.')
タスク投げる側
redis_spreadsheet_commander.py
import redis
import time
conn = redis.Redis()
def gs_read():
value = 'read'
value = value.encode('utf-8')
conn.rpush('gs_queues', value)
def gs_write(content):
value = 'write {}'.format(content)
value = value.encode('utf-8')
conn.rpush('gs_queues', value)
def gs_exit():
value = 'exit'.encode('utf-8')
conn.rpush('gs_queues', value)
def main():
for i in range(3):
gs_read()
time.sleep(10)
for j in range(5):
gs_write('{} times writing!'.format(j))
time.sleep(2)
print('And exit')
gs_exit()
if __name__ == '__main__':
main()
実行結果
$ python redis_spreadsheet_queuer.py &
[1] 13784
Redis stands by...
$ python redis_spreadsheet_commander.py
0 items awaiting.
val read is called.
2 items awaiting.
val read is called.
1 items awaiting.
val read is called.
0 items awaiting.
Refresh
val write 0 times writing! is called.
4 items awaiting.
val write 1 times writing! is called.
3 items awaiting.
val write 2 times writing! is called.
2 items awaiting.
val write 3 times writing! is called.
And exit
2 items awaiting.
val write 4 times writing! is called.
1 items awaiting.
Exit!
Done with gspread.
[1] + done python redis_spreadsheet_queuer.py
Redisメソッドメモ
llen
該当キーの値の個数を取得できるます。
今回のケースだとキューの長さを取得できます。
blpop
blpop(key[,timeout])
の文法で、
- リストにキーの値が無いか、リストの頭(左)から探し、あればリストからpop(削除して取得)する
- ない場合、timeoutに設定された秒数待つ(=処理をblockする)
- timeoutが0または何も設定されていない場合は、ずっと待つ
block left pop のことだと考えるとすんなり理解できました。
ちなみに、この専用プロセスは基本的に常駐させて動かす想定のため、タイムアウトは設定していません。
まとめ
本当はエラー時のリトライなども組み込みたいのですが、まずは最小構成で。
次からgspreadの実装をやっていきます。
ディスカッション
コメント一覧
まだ、コメントがありません