- 論壇徽章:
- 0
|
本帖最后由 aef25u 于 2020-05-30 13:53 編輯
簡介:
1、開啟各15個線程的1個生產(chǎn)者與2個消費(fèi)者A、B(AB同為耗時任務(wù))
2、生產(chǎn)者按條件通過2個Channel向A與B分別發(fā)送數(shù)據(jù)(數(shù)據(jù)不需按原始順序發(fā)送;按業(yè)務(wù)邏輯會發(fā)送不同數(shù)據(jù),暫時按發(fā)送同一數(shù)據(jù)模擬)
3、數(shù)組@share為共享數(shù)據(jù),消費(fèi)者A、B會向其讀取數(shù)據(jù)(不需寫入或改變元素)
4、消費(fèi)者A處理接收的數(shù)據(jù),不符合業(yè)務(wù)邏輯的數(shù)據(jù)會繼續(xù)傳入B進(jìn)行處理 if $v==999 {$supplierB.emit($v);}
問題:
1、這樣組織代碼有沒有不合理的地方?
2、共享的@share數(shù)組安不安全?
3、假如在消費(fèi)者A、B中使用以下cached 函數(shù),在各自不同線程下能起作用不?
use experimental :cached;
sub fun-name( $val1, $val2) is cached {...}
- my $TIME = now;
- my $supplierA = Supplier.new;
- my $channelA = $supplierA.Supply.Channel;
- my $supplierB = Supplier.new;
- my $channelB = $supplierB.Supply.Channel;
- my @share=("C","D");
- my $threads=15;
- my (@pA,@pB);
- for 1 .. $threads {
- @pA.push: start {
- react {
- whenever $channelA -> $v {
- if $v==999 {$supplierB.emit(1000);}#向消費(fèi)者B發(fā)送數(shù)據(jù),真實情況是發(fā)送$v
- say "channelA shareArr {@share[0]}:Thread {$*THREAD.id} got $v";
- }
- }
- }
- @pB.push: start {
- react {
- whenever $channelB -> $v {
- sleep 0.5;#真實代碼不需這一行,模擬單個B任務(wù)比A任務(wù)耗時
- say "channelB shareArr {@share[1]}:Thread {$*THREAD.id} got $v";
- }
- }
- }
- }
- my @promises;
- for ^1000 -> $r {
- push @promises, start {
- #sleep $r*0.001;
- sleep rand;
- $supplierA.emit($r);
- $supplierB.emit($r);
- };
- if @promises == 15 {
- await Promise.anyof(@promises);
- @promises .= grep({ !$_ });
- }
- }
- await @promises;
- $supplierA.done;
- await @pA;
- $supplierB.done;
- await @pB;
- $TIME = now - $TIME;
- say $TIME;
復(fù)制代碼
自已發(fā)現(xiàn)了個問題,要保證A處理完不符合業(yè)務(wù)邏輯的數(shù)據(jù)能傳到B不能這樣寫:
- $supplierA.done;
- $supplierB.done;
- await @pA,@pB;
復(fù)制代碼
|
|