長い時間かかるバッチを並行実行し何倍も早くする

何倍も早くなるかどうかは処理している内容によりますが、並行に実行することにより長い時間かかっていたバッチがはやくなることがあります。

並行に実行できそうな例

  • ゲームの設定などが入っているマスターデータを順番にBigQueryにインポートしているが遅い
  • MySQLのテーブルをRedshiftに順番にコピーしているが遅い
  • 複数SQLMySQLに投げ、すべて取得し終わってから処理をしている

まだまだたくさんありますが、ひとまずこれくらいに。 上記を実現できそうなモジュールとして、geventがあったので、試してみました。 (ちなみにgeventは1プロセスのイベントループで並行に処理するものなので、CPUバウンドな並列化の場合は、他にも組み合わせる必要があります)

geventを使わず直列で実行した場合

serial.png

通常、何もやらずに処理を書くと以下のように書くことになります。

import time
from datetime import datetime


def task(n):
    print('task {} start at {}'.format(n, datetime.now()))
    time.sleep(1)
    print('task {} end at {}'.format(n, datetime.now()))

map(task, xrange(3))

結果    
task 0 start at 2015-12-09 23:22:34.381534
task 0 end at 2015-12-09 23:22:35.381949
task 1 start at 2015-12-09 23:22:35.381989
task 1 end at 2015-12-09 23:22:36.384658
task 2 start at 2015-12-09 23:22:36.384715
task 2 end at 2015-12-09 23:22:37.385449

1秒かかるタスクが3つあるので3秒ほどかかってます。 CPUを専有しているタスクだったらいいのですが(1 coreなら)、I/Oで待たされているだけの場合は無駄に時間がかかってます。

geventで並行処理を書く

concurrent.png

以下のようにgeventを使って並行処理を書くことができます。 関数をそのまま、使って並行処理が書けるので楽です。

from datetime import datetime
import gevent


def task(n):
    print('task {} start at {}'.format(n, datetime.now()))
    gevent.sleep(1)
    print('task {} end at {}'.format(n, datetime.now()))

gevent.joinall([
    gevent.spawn(task, n) for n in xrange(3)
])

結果
task 0 start at 2015-12-09 23:22:59.650820
task 1 start at 2015-12-09 23:22:59.651468
task 2 start at 2015-12-09 23:22:59.651491
task 0 end at 2015-12-09 23:23:00.655431
task 1 end at 2015-12-09 23:23:00.655526
task 2 end at 2015-12-09 23:23:00.655598

処理が並列で実行できるので、約1秒で終わってます。

他のタスクが終わるのをまって実行する

event.png

1つのタスクを複数のタスクが待つこともできます。

from datetime import datetime
import gevent
from gevent.event import Event


first_finish_event = Event()
second_finish_event = Event()


def task_first():
    print('task_first start at {}'.format(datetime.now()))
    gevent.sleep(2)
    first_finish_event.set()
    print('task_first end at {}'.format(datetime.now()))


def task_second():
    print('task_second start at {}'.format(datetime.now()))
    first_finish_event.wait()
    gevent.sleep(1)
    second_finish_event.set()
    print('task_second end at {}'.format(datetime.now()))


def task_last():
    print('task_last start at {}'.format(datetime.now()))
    second_finish_event.wait()
    gevent.sleep(1)
    print('task_last end at {}'.format(datetime.now()))


gevent.joinall([
    gevent.spawn(task_first),
    gevent.spawn(task_second),
    gevent.spawn(task_last),
])

結果
task_first start at 2015-12-09 23:25:40.381855
task_second start at 2015-12-09 23:25:40.382446
task_last start at 2015-12-09 23:25:40.382479
task_first end at 2015-12-09 23:25:42.383815
task_second end at 2015-12-09 23:25:43.388675
task_last end at 2015-12-09 23:25:44.391731

並行で動作するタスクを制限する

pool.png

バッチを実行しているサーバのスペックが足りなかったりして、同時実行するタスクを制限したいときはPoolを使います。

from datetime import datetime
import gevent
from gevent.pool import Pool


pool = Pool(2)


def task(n):
    print('task {} start at {}'.format(n, datetime.now()))
    gevent.sleep(1)
    print('task {} end at {}'.format(n, datetime.now()))


pool.map(task, xrange(3))

結果
task 0 start at 2015-12-09 23:27:06.978898
task 1 start at 2015-12-09 23:27:06.978962
task 0 end at 2015-12-09 23:27:07.979837
task 1 end at 2015-12-09 23:27:07.979910
task 2 start at 2015-12-09 23:27:07.980069
task 2 end at 2015-12-09 23:27:08.984811

まとまったタスクごとに処理する

group.png

from datetime import datetime
import gevent
from gevent.pool import Group


first_group = Group()
second_group = Group()


def task(n):
    print('task {} start at {}'.format(n, datetime.now()))
    gevent.sleep(1)
    print('task {} end at {}'.format(n, datetime.now()))
    return n * 10

for i in first_group.imap(task, xrange(2)):
    print(i)

for i in second_group.imap(task, xrange(2, 4)):
    print(i)

結果
task 0 start at 2015-12-09 23:27:57.212093
task 1 start at 2015-12-09 23:27:57.212149
task 0 end at 2015-12-09 23:27:58.214046
task 1 end at 2015-12-09 23:27:58.214113
0
10
task 2 start at 2015-12-09 23:27:58.214336
task 3 start at 2015-12-09 23:27:58.214371
task 2 end at 2015-12-09 23:27:59.216035
task 3 end at 2015-12-09 23:27:59.216134
20
30

実際に多数のファイルをマスターデータ => BigQueryに入れるスクリプトに導入してみて

Poolを使ったのですが、プールの数の分だけ平行化して高速に処理ができました。 ただ手元のMacで動かした感じだと時々、エラーがでるのでそこらへん今度さぐってみたいと思います。