Plagger::Plugin::Aggregator::Async
Plaggerネタ
YAPC::Asiaのikebeさんのプレゼンにも名前がでてきたHTTP::Asyncを使ったAggregator。
パラレルでFeedの取得ができるので、Simple使うより早い。Xangoより使うのが簡単。
まだ、Cache周りが不完全。
confに以下で使える
plugins:
- module: Aggregator::Async
- module: Subscription::Config
config:
feed:
- https://blog.nomadscafe.jp/
ソースは↓追記に張り付けた。
package Plagger::Plugin::Aggregator::Async;
use strict;
use base qw( Plagger::Plugin::Aggregator::Simple );
use HTTP::Async 0.07;
__PACKAGE__->mk_accessors( qw/async/ );
sub register {
my($self, $context) = @_;
$self->async(
HTTP::Async->new( %{$self->conf->{async_args} || {}} )
);
$self->{_id2feed} = {};
$context->register_hook(
$self,
'customfeed.handle' => \&aggregate,
'aggregator.finalize' => \&finalize,
);
}
sub aggregate {
my($self, $context, $args) = @_;
my $url = $args->{feed}->url;
my $id = $self->async->add( $self->prep_req( $context, $url) );
$self->{_id2feed}->{ $id } = $args->{feed};
}
sub prep_req {
my ( $self, $context, $url ) = @_;
my $req = HTTP::Request->new(
GET => $url
);
$req->user_agent( "Plagger/$Plagger::VERSION (http://plagger.org/)" );
my $ref = $self->cache->get($url);
if ( $ref ) {
$req->if_modified_since( $ref->{LastModified} )
if $ref->{LastModified};
$req->header('If-None-Match', $ref->{ETag} )
if $ref->{ETag};
}
$req;
}
sub finalize {
my($self, $context, $args) = @_;
while ( my ( $response, $id ) = $self->async->wait_for_next_response ) {
my $feed = $self->{_id2feed}->{$id};
$context->log(info => "Fetch " . $feed->url);
$self->handle_response( $context, $response, $feed );
}
}
sub handle_response {
my ( $self, $context, $response, $feed ) = @_;
my $url = $response->request->uri;
if ( $response->code == 304) {
$context->log(error => "Not Modified: $url");
return;
}
elsif (! $response->is_success) {
$context->log(error => "Fetch for $url failed: " . $response->code);
return;
}
my $ufr = TO_URI_FETCH_RESPONSE( $response );
my $feed_url = Plagger::FeedParser->discover($ufr);
if ($url eq $feed_url) {
$self->handle_feed($url, \$response->content, $feed);
} elsif ($feed_url) {
my $new_id = $self->async->add( $self->prep_req($context, $feed_url ) );
$self->{_id2feed}->{$new_id} = $feed;
} else {
return;
}
$self->cache->set(
$response->request->uri,
{
ETag => $response->header('ETag') || '',
LastModified => $response->header('Last-Modified') || ''
}
);
return 1;
}
## XXX copy from Xango
sub TO_URI_FETCH_RESPONSE
{
my ($r) = @_;
my $ufr = URI::Fetch::Response->new();
$ufr->http_status($r->code);
$ufr->http_response($r);
$ufr->status(
$r->previous && $r->previous->code == &HTTP::Status::RC_MOVED_PERMANENTLY ? &URI::Fetch::URI_MOVED_PERMANENTLY :
$r->code == &HTTP::Status::RC_GONE ? &URI::Fetch::URI_GONE :
$r->code == &HTTP::Status::RC_NOT_MODIFIED ? &URI::Fetch::URI_NOT_MODIFIED :
&URI::Fetch::URI_OK
);
$ufr->etag($r->header('ETag'));
$ufr->last_modified($r->header('Last-Modified'));
$ufr->uri($r->request->uri);
$ufr->content($r->content);
$ufr->content_type($r->content_type);
return $ufr;
}
1;