2011年9月3日土曜日

perl のマルチスレッド

ども。ある案件で perl の マルチスレッドを使うことになりました。

use threads;
my @ts;
for (1..5) {
   my $t = threads->create(sub {
       # some code
       …
       # 1つのスレッド処理完了 *1

   });
   push(@ts, $t);
}
eval { $_->join } for @ts;


# 終了処理 *2

このようなコードを書いたのですが、どれかのスレッドの「# 1つのスレッド処理完了 *1」のタイミング「Scalars leaked: 1」が発生します。
このエラーが発生するとまだ「# 終了処理 *2」は実行されず、メインプロセスごと落ちてしまいます。
まだ実行中のスレッドもメインプロセスとともに落ちます。

再現率は 100%ではないのですが、スレッドでやる仕事時間が長ければ長いほど「Scalars leaked: 1」は 100%に近い状態で発生していました。

スレッドの呼び出し方法が悪いのかと思い 事前に関数定義したり
my $runnable = sub {
   # some coe
};

my @ts;
for (1..5) {
   my $t = threads->create($runnable);
   push(@ts, $t);
}

eval { $_->join } for @ts;


こんなことしたり
my @ts;
for (1..5) {
   my $t = threads->create(\&runnable);
   push(@ts, $t);
}
eval { $_->join } for @ts;

sub runnable {
   # some coe
};


こんなことしたり、色々試しましたが症状は改善しません。
my @ts;
for (1..5) {
   my $t = threads->create('runnable');
   push(@ts, $t);
}
eval { $_->join } for @ts;

sub runnable {
   # some coe
};


perl 5.8.8 を使っていたので perl 5.14.1 にもしてみましたが、「Scalars leaked: 1」の代わりに「Segmentation fault」が発生するようになりました。


最終的に以下の方法でひとまず解決しました。perl 5.10.x あたりから登場する threads::shared を使います。

use threads;
use threads::shared;

my %alive :shared;
my $keepalive :shared;

$keepalive = 1;
for my $i (1..5) {
   $alive{$i} = time();
   my $t = threads->create(sub {
       my $ii = shift;
       # some code
       $alive{$ii} = time(); # この時間にまだ生きいることを知らせる
       # some code
       $alive{$ii} = time(); # この時間にまだ生きいることを知らせる
       # some code
       delete $alive{$ii}; # このスレッドの処理が終わったので %alive を消す
       while (1) {
           last unless $keepalive; # 他のすべてのスレッドが終了し、メインスレッドの「# 終了処理 *2」が終わるまで待機
           sleep(2);
       }
   }, $i);
   $t->detach; # join しない
}


# join の代わりに %alive が全部なくなるまで待つ

while (1) {
   my @keys = keys %alive;
   last if @keys == 0; # %alive が無くなったら終了
   my $now = time();
   for my $key (@keys) {
       my $a = $alive{$key} or next;
       if ($now - $a > 30) {
            # %alive 処理ができずに落ちたスレッドがいるかも知れないので、最後の生存通知から 30秒以上たってる場合は以後処理しない。
           warn "timeout $key";
           delete $alive{$key};
       }
   }
   sleep(5);
}

# 終了処理 *2
# some code

$keepalive = 0; # すべてのスレッドに関数を終了して良いことを知らせる。このあとどこかで Segmentation fault が発生するかも知れないが、構わないものとする。


たぶん、大丈夫だと思う。美しくはないですが。。
あやうく Java で書きなおそうかと思いました。

2011年8月23日火曜日

memcached でキーの列挙 (3)

前回のエントリ memcached でキーの列挙 (2)

こんなん作りました。memcached-tool ぽく。既にあったらすません~。
http://code.google.com/p/memcached-keys/

2011年6月7日火曜日

Google App Engine (GAE) の runserver 停止

ども!ご無沙汰してます。

近頃は受託案件に加え、モバゲー向けに自社製ゲームを出したりしてましたが、6月2日にサービスを終了しました。
時間ができたら開発に至った経緯や失敗談など書いていこうと思います。



さて、最近は Google App Engine Java で試験的に開発などを行っています。
ビルドは Windows にて ant を使っていますが、そこで遭遇した問題を 1つ解決したのでメモ書きです。



ant でのビルドと動作確認は runserver タスクを実行しますが、ドキュメント通りに Ctrl+C でプロセスを落としても Jetty の java.exe が残ったままになる問題があります。

残ったままでは次の変更が反映されないので、タスクマネージャで java.exe を手動で終了させます。
めんどくさいので以下のように runserver を定義することで解決しました。

変更前
<target name="runserver" depends="datanucleusenhance"
description="Starts the development server.">
<dev_appserver war="war" />
</target>


変更後
<target name="runserver" depends="datanucleusenhance"
description="Starts the development server.">
<java classname="com.google.appengine.tools.development.DevAppServerMain"
classpath="${appengine.tools.classpath}"
fork="true" failonerror="true">
<jvmarg value="-javaagent:${appengine.sdk.home}/lib/agent/appengine-agent.jar" />
<arg value="--port=8080"/>
<arg value="--address=localhost"/>
<arg value="war"/>
</java>
</target>


Google App Engine 標準の runserver は dev_appserver マクロを呼び出し、com.google.appengine.tools.KickStart を起動していますが、これをすっ飛ばして
com.google.appengine.tools.development.DevAppServerMain を直接起動します。

キモは -javaagent ですね。

これで少し快適に開発ができるようになるはず。。