現在のところ(Ruby3.0)、Rubyはネイティブスレッドを1つしか利用することはできない。それゆえ、CPUのマルチコア性能を活かす目的でThreadは使えない。そちらの目的ではProcessを使う必要がある。Rubyでマルチスレッドプログラミングをする主な目的は非同期のイベント処理やI/O待ち時間の緩和だろう。このとき、最低限抑えておかないといけないのがThreadとQueueだ。Threadクラスがスレッドの生成・分岐と合流を担うのは理解しやすいが、Queueはやや分かりにくい。これはスレッド間で共通の処理対象を保管するスタックのように使う。Queueに積みあがった処理対象をThreadが順番に取っていきながら処理を進めるイメージである(図参照)。
では、さっそく使用例を見てもらおう。0から1のランダムな少数の配列を3つのスレッドで順番に処理していくという単純なものだ。ただし、数字の分だけ処理に時間がかかるものとする。
#encoding: utf-8 if __FILE__ == $PROGRAM_NAME require 'thread' def main ary = [0.4, 0.2, 0.3, 0.5, 0.2, 0.4, 0.5, 0.1, 0.2, 0.4, 0.5, 0.9] p ary q = Queue.new ary.each{ |a| q.push(a) } threads = Array.new(3) do |i| Thread.new{ until q.empty? x = q.pop puts("thread #{i} takes #{x}") sleep(x) end } end threads.each(&:join) end main end
結果の一例は以下のようになる
[0.4, 0.2, 0.3, 0.5, 0.2, 0.4, 0.5, 0.1, 0.2, 0.4, 0.5, 0.9] thread 0 takes 0.4 thread 1 takes 0.2 thread 2 takes 0.3 thread 1 takes 0.5 thread 2 takes 0.2 thread 0 takes 0.4 thread 2 takes 0.5 thread 1 takes 0.1 thread 0 takes 0.2 thread 1 takes 0.4 thread 0 takes 0.5 thread 2 takes 0.9
ここで「なぜ配列からQueueに一旦内容をコピーするの?」と疑問に思われる方もいるかもしれない。これは主にスレッドセーフを考慮しての事である。配列(または、それに類似するクラス)がもしスレッドセーフでなければ、empty?からpopするまでに他のスレッドに割り込まれてしまい、「既にカラになってしまった配列からさらに取ろうとしてエラー」などの問題が起こる可能性があるからだ。Queue.popはそのような心配はなく、カラの場合何か書き込まれるまでThreadが停止する仕組みになっている。
よりすっきりとしたスタイルでの記述を可能にするParallelというライブラリも存在する。これはArrayのmapやcollectメソッドを拡張して並列化してくれるものだ。これを使う前に自前で似たようなものを作ってみよう。前回記事のサンプルを並列化で改良するとすれば、欲しいのはpartitionの並列バージョンである。続きは次回。

コメント
コメント一覧 (1)