本帖最后由 aef25u 于 2017-08-18 23:03 編輯
眾所周知,Spark的ML Pipelines類庫用于構(gòu)建機(jī)器學(xué)習(xí)的工作流,每一個PipelineStage 都會完成一個任務(wù),如數(shù)據(jù)集處理轉(zhuǎn)化,模型訓(xùn)練,參數(shù)設(shè)置或數(shù)據(jù)預(yù)測等。 Spark的ML Pipelines工作流大概是這個樣子的。 - val pipeline = new
- Pipeline().setStages(Array(labelIndexer,vectorAssembler,rfClassifier,labelConverter))
- val model = pipeline.fit(trainingData)
復(fù)制代碼 而perl的cpan上也有個Pipeline模塊,作者是用perl的舊版oop實(shí)現(xiàn)的,在分析了源碼和個人需求后,用Moose改寫出了自已的Pipeline,主要實(shí)現(xiàn)了類ML Pipelines的DataFrame(改用perl的Data::Table模塊)與Transformer功能。以下介紹如何使用Moose寫Pipeline模塊。
一、UML類圖設(shè)計(jì)
0uml.png (37.6 KB, 下載次數(shù): 57)
下載附件
2017-08-18 22:45 上傳
二、模塊間關(guān)系說明- Pipeline類的dispatcher屬性是PipelineDispatch的實(shí)例對象
- PipelineDispatch類繼承自ipelineBase,屬性segments是利用Moose的屬性委托功能實(shí)現(xiàn)的用于存放PipelineSegment實(shí)例對象的數(shù)組引用
- PipelineDispatch類內(nèi),屬性dfhash是利用Moose的屬性委托功能實(shí)現(xiàn)的用于存放各PipelineSegment->dispatch()返回值的hash引用:{ref($PipelineSegment)=>$df}即(YouSegmentClassName=>$df)
- PipelineSegment類的store屬性是Pipelinestore的實(shí)例對象
- Pipelinestore類的功能,主要是在不同的PipelineSegment間存入數(shù)據(jù)或取出數(shù)據(jù)
dispatch()是核心方法,抽象方法如下: - dispatch()屬Pipeline類方法
- dispatch_loop()屬Pipeline類方法,是dispatch()的內(nèi)置方法
- next()屬PipelineDispatch類方法
內(nèi)部調(diào)用關(guān)系如下: - Pipeline->dispatch(
- Pipeline->dispatch_loop(
- Pipeline::Dispatch->next(
- Pipeline::Segment->prepare_dispatch(Pipeline);
- my $df = Pipeline::Segment->dispatch();
- );
- );
- );
復(fù)制代碼
注:自已寫的繼承自Pipeline::Segment的Segment類,即是Spark的ML Pipelines類的一個個Transformer 三、模塊源碼3.1Pipeline類- package Pipeline;
- use Moose;
- #use namespace::clean;
- use Pipeline::Dispatch;
- has 'debug' => (
- is => 'rw',
- isa => 'Int',
- default => 0,
- );
- has 'dispatcher' => (
- is => 'ro',
- isa => 'Pipeline::Dispatch',
- default => sub { Pipeline::Dispatch->new(); },
- handles => {
- get_segment => 'get',
- add_segment => 'add',
- del_segment => 'delete'
- }
- );
- has 'store' => (
- is => 'rw',
- isa => 'Pipeline::Store',
- default => sub { Pipeline::Store->new() },
- );
- sub segments {
- my $self = shift;
- return $self->{dispatcher}->segments(@_);
- }
- sub dispatch {
- my $self = shift;
- $self->dispatch_loop();
- $self->{dispatcher}->reset();
- }
- sub dispatch_loop {
- my $self = shift;
- $self->{dispatcher}->debug( $self->{debug} );
- while ( $self->{dispatcher}->segment_available ) {
- $self->{dispatcher}->next($self);
- }
- }
- sub getDf {
- my ($self,$segname) = @_;
-
- $self->{dispatcher}->getDf($segname);
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
復(fù)制代碼
3.2Pipeline: ispatch類復(fù)制代碼
3.3Pipeline::Base類- package Pipeline::Base;
- use Moose;
- #use namespace::clean;
- has 'debug' => (
- is => 'rw',
- isa => 'Int',
- default => 0,
- );
- sub emit {
- my ( $self, $mesg ) = @_;
- $self->log( $self->_format_message($mesg) ) if $self->debug;
- }
- sub log {
- my ( $self, $mesg ) = @_;
- print STDERR $mesg;
- }
- sub _format_message {
- my ( $self, $mesg ) = @_;
- my $class = ref($self);
- return "[$class] $mesg\n";
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
復(fù)制代碼
3.4Pipeline::Segment類- package Pipeline::Segment;
- use Moose;
- has 'store' => (
- is => 'rw',
- isa => 'Pipeline::Store',
- default => sub { Pipeline::Store->new() },
- );
- sub dispatch {
- my $self = shift;
- }
- sub prepare_dispatch {
- my ( $self, $pipe ) = @_;
- $self->store( $pipe->store );
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
復(fù)制代碼
3.5Pipeline::Store類- package Pipeline::Store;
- use Moose;
- has 'storehash' => (
- traits => ['Hash'],
- is => 'ro',
- isa => 'HashRef[Object]',
- default => sub { {} },
- handles => {
- _set_opt => 'set',
- get => 'get',
- }
- );
- sub set {
- my $self = shift;
- my $obj = shift;
- if (defined( $obj )) {
- $self->_set_opt(ref($obj),$obj);
- }
- return $self;
-
- }
- #__PACKAGE__->meta->make_immutable;
- 1;
復(fù)制代碼
四、Example
4.1example.pl- package MyDf;
- use Moose;
- extends 'Pipeline::Segment';
- has 'df' => (
- is => 'rw',
- isa => 'Data::Table',
- );
- package MyData;
- use Moose;
- extends 'Pipeline::Segment';
- has 'df' => (
- is => 'rw',
- isa => 'Data::Table',
- );
- sub dispatch {
- my $self = shift;
- $self->store->set( MyDf->new( df => $self->{df} ) );
- return $self->{df};
- }
- package MySeg1;
- use Moose;
- extends 'Pipeline::Segment';
- sub dispatch {
- my $self = shift;
- my $df = $self->store->get('MyDf');
- #MySeg1將MyDf增加了一行合計(jì)數(shù)
- $df->{df}->addRow( ['合計(jì)',8,undef], 3 );
- return $df->{df};
- }
- package MySeg2;
- use Moose;
- extends 'Pipeline::Segment';
- sub dispatch {
- my $self = shift;
- my $df = $self->store->get('MyDf');
- #MySeg2將MyDf增加了一列總金額
- $df->{df}->addCol( [100,100,100,300],"total" ,3 );
- return $df->{df};
- }
- package main;
- use lib './lib';
- use Pipeline;
- use Data::Table;
- use Data::Printer;
- my $headers = [ 'name', 'count', 'price' ];
- my $rows = [ [ 'A', '1', '100' ],
- [ 'B', '2', '50' ],
- [ 'C', '5', '20' ]];
- my $df = Data::Table->new( $rows, $headers, 0 );
- #p $df->csv;
- my $pipeline = Pipeline->new();
- $pipeline->debug(1);
- my $mydata = MyData->new( df => $df );
- my $seg1 = MySeg1->new();
- my $seg2 = MySeg2->new();
- $pipeline->add_segment( $mydata, $seg1, $seg2 );
- my $production = $pipeline->dispatch();
- #p $pipeline->store->get('MyDf')->{df}->csv;
- p $pipeline->getDf("MyData")->csv;
- p $pipeline->getDf("MySeg1")->csv;
- p $pipeline->getDf("MySeg2")->csv;
- #Author blog:tianyv.github.io
復(fù)制代碼
|