#!/usr/bin/perl
use strict;
use Time::HiRes qw/usleep/;
use threads;
use Thread::Queue;
use IO::Socket;
use IO::Select;
# コマンドパラメータにしようと思ってまだ出来てないとこ
my $threadnum=10;
my $cps=50;
my $scenario="hoge";
# 変数とか定義
my $sndcnt=0; # 起動時から送ったシナリオの数
my $endflg; # 終了フラグ(値を定義すると終了する。現状SIGINTを受けたときに定義される)
my $starttime=time(); # 起動時刻
my $freeQ = new Thread::Queue; # 空いてるシナリオ
my $workQ = new Thread::Queue; # 送信中シナリオ
# 標準出力をバッファしない(autoflush)
$| = 1;
# SIGINT(Ctrl+C)を受けたときに終了する。(一応、通信の完了 or T.O.を待つ)
$SIG{INT}=sub {
print "Ctrl + C ====> EndFlag ON!!!!\n";
$endflg=1;
};
# 送信データ読み込み($scenarioで指定したフォルダ配下のファイルすべてをハッシュに突っ込む)
$scenario=$scenario . "/*";
my @files = grep -f, glob "$scenario"; # $scenario/*に一致するファイルを探す。
my %senddatas;
foreach ( sort @files ){
# ファイルをバイナリモードで開いて
open( FH , "< $_" );
binmode( FH );
# ハッシュに突っ込む
$senddatas{$_}=;
# 空きシナリオキューに登録(メモリ削減のため名前だけ)
$freeQ->enqueue($_);
}
# (送信用)スレッド作成&起動
my @threads;
print "Create threads : ";
foreach (1 .. $threadnum){
my $thread = threads->new(\&my_thread, $_);
push(@threads, $thread);
}
while( ! defined $endflg ){
usleep(10000);
my $nowtime=time();
my $difftime=$nowtime - $starttime;
my $addquenum=$cps * $difftime - $sndcnt;
foreach( 1 .. $addquenum ){
my $q = $freeQ->dequeue_nb;
if( defined $q ){
$workQ->enqueue( $q );
$sndcnt++;
if( $sndcnt % 1000 == 0 ){
printf "%d信号送信 => %d秒(%0.1f)cps\n", $sndcnt, $difftime, $sndcnt/$difftime;
}
}else{
print "000 : 空きシナリオなし(シナリオ足りてない?)\n";
print "ちょっと待ってから再開。\n";
sleep(1);
}
}
}
$workQ->enqueue( undef );
print "Join threads \n";
foreach(@threads){
my ($return) = $_->join;
}
print "finish\n";
# スレッドの処理
# 各スレッドはよきタイミングでデータをqueueから抜き出します
sub my_thread {
my $i=shift;
while (my $q = $workQ->dequeue){
#printf( "%03d : sendfile(%s)\n", $i, $q );
my $sock = new IO::Socket::INET(PeerAddr=>'127.0.0.1',
PeerPort=>22,
TimeOut=>0.1,
Blocking=>0,
Proto=>'tcp');
die "IO::Socket : $!" unless $sock;
print $sock $senddatas{$q};
my $s = IO::Select->new();
$s->add($sock);
foreach ( $s->can_read(0.1) ){
my $msg=<$sock>;
chomp( $msg );
#printf( "%03d : recv(%s)\n",$i, $msg );
}
close($sock);
$freeQ->enqueue($q);
threads->yield();
}
$workQ->enqueue( undef );
return;
}