« YAPC::Asia 2007でLTしてきました | メイン | keepalived-1.1.13がkernel 2.6.20でコンパイルできない »

Plagger::Plugin::Aggregator::Async

Plaggerネタ

YAPC::Asiaのikebeさんのプレゼンにも名前がでてきたHTTP::Asyncを使ったAggregator。
パラレルでFeedの取得ができるので、Simple使うより早い。Xangoより使うのが簡単。
まだ、Cache周りが不完全。

confに以下で使える

plugins:
  - module: Aggregator::Async
  - module: Subscription::Config
    config:
      feed:
        - https://blog.nomadscafe.jp/


ソースは↓追記に張り付けた。

package Plagger::Plugin::Aggregator::Async;
use strict;
use base qw( Plagger::Plugin::Aggregator::Simple );
use HTTP::Async 0.07;

__PACKAGE__->mk_accessors( qw/async/ );

sub register {
    my($self, $context) = @_;

    $self->async(
        HTTP::Async->new( %{$self->conf->{async_args} || {}} )
    );
    $self->{_id2feed} = {};

    $context->register_hook(
        $self,
        'customfeed.handle'   => \&aggregate,
        'aggregator.finalize' => \&finalize,
    );
}

sub aggregate {
    my($self, $context, $args) = @_;
    my $url = $args->{feed}->url;

    my $id = $self->async->add( $self->prep_req( $context, $url) );
    $self->{_id2feed}->{ $id } = $args->{feed};
}

sub prep_req {
    my ( $self, $context, $url ) = @_;
    my $req = HTTP::Request->new(
        GET => $url
    );
    $req->user_agent( "Plagger/$Plagger::VERSION (http://plagger.org/)" );
    
    my $ref = $self->cache->get($url);
    if ( $ref ) {
        $req->if_modified_since( $ref->{LastModified} ) 
            if $ref->{LastModified};
        $req->header('If-None-Match', $ref->{ETag} )
            if $ref->{ETag};
    }

    $req;
}

sub finalize {
    my($self, $context, $args) = @_;
    while ( my ( $response, $id ) = $self->async->wait_for_next_response ) {
        my $feed = $self->{_id2feed}->{$id};
        $context->log(info => "Fetch " . $feed->url);
        $self->handle_response( $context, $response, $feed );
    }
}

sub handle_response {
    my ( $self, $context, $response, $feed )  = @_;
    my $url = $response->request->uri;

    if ( $response->code == 304) {
        $context->log(error => "Not Modified: $url");
        return;
    }
    elsif (! $response->is_success) {
        $context->log(error => "Fetch for $url failed: " . $response->code);
        return;
    }

    my $ufr = TO_URI_FETCH_RESPONSE( $response );
    my $feed_url = Plagger::FeedParser->discover($ufr);
    if ($url eq $feed_url) {
        $self->handle_feed($url, \$response->content, $feed);
    } elsif ($feed_url) {
        my $new_id = $self->async->add( $self->prep_req($context, $feed_url ) );
        $self->{_id2feed}->{$new_id} = $feed;
    } else {
        return;
    }

    $self->cache->set(
        $response->request->uri,
        {
            ETag => $response->header('ETag') || '',
            LastModified => $response->header('Last-Modified') || ''
        }
    );

    return 1;
}

## XXX copy from Xango
sub TO_URI_FETCH_RESPONSE
{
    my ($r) = @_;

    my $ufr = URI::Fetch::Response->new();
    $ufr->http_status($r->code);
    $ufr->http_response($r);
    $ufr->status(
        $r->previous && $r->previous->code == &HTTP::Status::RC_MOVED_PERMANENTLY ? &URI::Fetch::URI_MOVED_PERMANENTLY :
        $r->code == &HTTP::Status::RC_GONE ? &URI::Fetch::URI_GONE :
        $r->code == &HTTP::Status::RC_NOT_MODIFIED ? &URI::Fetch::URI_NOT_MODIFIED :
        &URI::Fetch::URI_OK
    );
    $ufr->etag($r->header('ETag'));
    $ufr->last_modified($r->header('Last-Modified'));
    $ufr->uri($r->request->uri);
    $ufr->content($r->content);
    $ufr->content_type($r->content_type);

    return $ufr;
}

1;