2011年9月3日土曜日

perl のマルチスレッド

ども。ある案件で perl の マルチスレッドを使うことになりました。

use threads;
my @ts;
for (1..5) {
   my $t = threads->create(sub {
       # some code
       …
       # 1つのスレッド処理完了 *1

   });
   push(@ts, $t);
}
eval { $_->join } for @ts;


# 終了処理 *2

このようなコードを書いたのですが、どれかのスレッドの「# 1つのスレッド処理完了 *1」のタイミング「Scalars leaked: 1」が発生します。
このエラーが発生するとまだ「# 終了処理 *2」は実行されず、メインプロセスごと落ちてしまいます。
まだ実行中のスレッドもメインプロセスとともに落ちます。

再現率は 100%ではないのですが、スレッドでやる仕事時間が長ければ長いほど「Scalars leaked: 1」は 100%に近い状態で発生していました。

スレッドの呼び出し方法が悪いのかと思い 事前に関数定義したり
my $runnable = sub {
   # some coe
};

my @ts;
for (1..5) {
   my $t = threads->create($runnable);
   push(@ts, $t);
}

eval { $_->join } for @ts;


こんなことしたり
my @ts;
for (1..5) {
   my $t = threads->create(\&runnable);
   push(@ts, $t);
}
eval { $_->join } for @ts;

sub runnable {
   # some coe
};


こんなことしたり、色々試しましたが症状は改善しません。
my @ts;
for (1..5) {
   my $t = threads->create('runnable');
   push(@ts, $t);
}
eval { $_->join } for @ts;

sub runnable {
   # some coe
};


perl 5.8.8 を使っていたので perl 5.14.1 にもしてみましたが、「Scalars leaked: 1」の代わりに「Segmentation fault」が発生するようになりました。


最終的に以下の方法でひとまず解決しました。perl 5.10.x あたりから登場する threads::shared を使います。

use threads;
use threads::shared;

my %alive :shared;
my $keepalive :shared;

$keepalive = 1;
for my $i (1..5) {
   $alive{$i} = time();
   my $t = threads->create(sub {
       my $ii = shift;
       # some code
       $alive{$ii} = time(); # この時間にまだ生きいることを知らせる
       # some code
       $alive{$ii} = time(); # この時間にまだ生きいることを知らせる
       # some code
       delete $alive{$ii}; # このスレッドの処理が終わったので %alive を消す
       while (1) {
           last unless $keepalive; # 他のすべてのスレッドが終了し、メインスレッドの「# 終了処理 *2」が終わるまで待機
           sleep(2);
       }
   }, $i);
   $t->detach; # join しない
}


# join の代わりに %alive が全部なくなるまで待つ

while (1) {
   my @keys = keys %alive;
   last if @keys == 0; # %alive が無くなったら終了
   my $now = time();
   for my $key (@keys) {
       my $a = $alive{$key} or next;
       if ($now - $a > 30) {
            # %alive 処理ができずに落ちたスレッドがいるかも知れないので、最後の生存通知から 30秒以上たってる場合は以後処理しない。
           warn "timeout $key";
           delete $alive{$key};
       }
   }
   sleep(5);
}

# 終了処理 *2
# some code

$keepalive = 0; # すべてのスレッドに関数を終了して良いことを知らせる。このあとどこかで Segmentation fault が発生するかも知れないが、構わないものとする。


たぶん、大丈夫だと思う。美しくはないですが。。
あやうく Java で書きなおそうかと思いました。