package Mojo::IOLoop::Stream; use Mojo::Base 'Mojo::EventEmitter'; use Errno qw(EAGAIN ECONNRESET EINTR EWOULDBLOCK); use Mojo::IOLoop; use Mojo::Util; use Scalar::Util qw(weaken); has high_water_mark => 1048576; has reactor => sub { Mojo::IOLoop->singleton->reactor }, weak => 1; sub DESTROY { shift->close unless ${^GLOBAL_PHASE} eq 'DESTRUCT' } sub bytes_read { shift->{read} || 0 } sub bytes_waiting { length(shift->{buffer} // '') } sub bytes_written { shift->{written} || 0 } sub can_write { $_[0]{handle} && $_[0]->bytes_waiting < $_[0]->high_water_mark } sub close { my $self = shift; return unless my $reactor = $self->reactor; return unless my $handle = delete $self->timeout(0)->{handle}; $reactor->remove($handle); $self->emit('close'); } sub close_gracefully { $_[0]->is_writing ? $_[0]{graceful}++ : $_[0]->close } sub handle { shift->{handle} } sub is_readable { my $self = shift; $self->_again; return $self->{handle} && Mojo::Util::_readable(0, fileno $self->{handle}); } sub is_writing { my $self = shift; return undef unless $self->{handle}; return !!length($self->{buffer}) || $self->has_subscribers('drain'); } sub new { shift->SUPER::new(handle => shift, timeout => 15) } sub start { my $self = shift; # Resume return unless $self->{handle}; my $reactor = $self->reactor; return $reactor->watch($self->{handle}, 1, $self->is_writing) if delete $self->{paused}; weaken $self; my $cb = sub { pop() ? $self->_write : $self->_read }; $reactor->io($self->timeout($self->{timeout})->{handle} => $cb); } sub steal_handle { my $self = shift; $self->reactor->remove($self->{handle}); return delete $self->{handle}; } sub stop { $_[0]->reactor->watch($_[0]{handle}, 0, $_[0]->is_writing) if $_[0]{handle} && !$_[0]{paused}++ } sub timeout { my ($self, $timeout) = @_; return $self->{timeout} unless defined $timeout; $self->{timeout} = $timeout; my $reactor = $self->reactor; if ($self->{timer}) { if (!$self->{timeout}) { $reactor->remove(delete $self->{timer}) } else { $reactor->again($self->{timer}, $self->{timeout}) } } elsif ($self->{timeout}) { weaken $self; $self->{timer} = $reactor->timer($timeout => sub { $self and delete($self->{timer}) and $self->emit('timeout')->close }); } return $self; } sub write { my ($self, $chunk, $cb) = @_; # IO::Socket::SSL will corrupt data with the wrong internal representation utf8::downgrade $chunk; $self->{buffer} .= $chunk; if ($cb) { $self->once(drain => $cb) } elsif (!length $self->{buffer}) { return $self } $self->reactor->watch($self->{handle}, !$self->{paused}, 1) if $self->{handle}; return $self; } sub _again { $_[0]->reactor->again($_[0]{timer}) if $_[0]{timer} } sub _read { my $self = shift; if (defined(my $read = $self->{handle}->sysread(my $buffer, 131072, 0))) { $self->{read} += $read; return $read == 0 ? $self->close : $self->emit(read => $buffer)->_again; } # Retry return undef if $! == EAGAIN || $! == EINTR || $! == EWOULDBLOCK; # Closed (maybe real error) $! == ECONNRESET ? $self->close : $self->emit(error => $!)->close; } sub _write { my $self = shift; # Handle errors only when reading (to avoid timing problems) my $handle = $self->{handle}; if (length $self->{buffer}) { return undef unless defined(my $written = $handle->syswrite($self->{buffer})); $self->{written} += $written; $self->emit(write => substr($self->{buffer}, 0, $written, ''))->_again; } # Clear the buffer to free the underlying SV* memory undef $self->{buffer}, $self->emit('drain') unless length $self->{buffer}; return undef if $self->is_writing; return $self->close if $self->{graceful}; $self->reactor->watch($handle, !$self->{paused}, 0) if $self->{handle}; } 1; =encoding utf8 =head1 NAME Mojo::IOLoop::Stream - Non-blocking I/O stream =head1 SYNOPSIS use Mojo::IOLoop::Stream; # Create stream my $stream = Mojo::IOLoop::Stream->new($handle); $stream->on(read => sub ($stream, $bytes) {...}); $stream->on(close => sub ($stream) {...}); $stream->on(error => sub ($stream, $err) {...}); # Start and stop watching for new data $stream->start; $stream->stop; # Start reactor if necessary $stream->reactor->start unless $stream->reactor->is_running; =head1 DESCRIPTION L is a container for I/O streams used by L. =head1 EVENTS L inherits all events from L and can emit the following new ones. =head2 close $stream->on(close => sub ($stream) {...}); Emitted if the stream gets closed. =head2 drain $stream->on(drain => sub ($stream) {...}); Emitted once all data has been written. =head2 error $stream->on(error => sub ($stream, $err) {...}); Emitted if an error occurs on the stream, fatal if unhandled. =head2 read $stream->on(read => sub ($stream, $bytes) {...}); Emitted if new data arrives on the stream. =head2 timeout $stream->on(timeout => sub ($stream) {...}); Emitted if the stream has been inactive for too long and will get closed automatically. =head2 write $stream->on(write => sub ($stream, $bytes) {...}); Emitted if new data has been written to the stream. =head1 ATTRIBUTES L implements the following attributes. =head2 high_water_mark my $size = $msg->high_water_mark; $msg = $msg->high_water_mark(1024); Maximum size of L buffer in bytes before L returns false, defaults to C<1048576> (1MiB). =head2 reactor my $reactor = $stream->reactor; $stream = $stream->reactor(Mojo::Reactor::Poll->new); Low-level event reactor, defaults to the C attribute value of the global L singleton. Note that this attribute is weakened. =head1 METHODS L inherits all methods from L and implements the following new ones. =head2 bytes_read my $num = $stream->bytes_read; Number of bytes received. =head2 bytes_waiting my $num = $stream->bytes_waiting; Number of bytes that have been enqueued with L and are waiting to be written. =head2 bytes_written my $num = $stream->bytes_written; Number of bytes written. =head2 can_write my $bool = $stream->can_write; Returns true if calling L is safe. =head2 close $stream->close; Close stream immediately. =head2 close_gracefully $stream->close_gracefully; Close stream gracefully. =head2 handle my $handle = $stream->handle; Get handle for stream, usually an L or L object. =head2 is_readable my $bool = $stream->is_readable; Quick non-blocking check if stream is readable, useful for identifying tainted sockets. =head2 is_writing my $bool = $stream->is_writing; Check if stream is writing. =head2 new my $stream = Mojo::IOLoop::Stream->new($handle); Construct a new L object. =head2 start $stream->start; Start or resume watching for new data on the stream. =head2 steal_handle my $handle = $stream->steal_handle; Steal L and prevent it from getting closed automatically. =head2 stop $stream->stop; Stop watching for new data on the stream. =head2 timeout my $timeout = $stream->timeout; $stream = $stream->timeout(45); Maximum amount of time in seconds stream can be inactive before getting closed automatically, defaults to C<15>. Setting the value to C<0> will allow this stream to be inactive indefinitely. =head2 write $stream = $stream->write($bytes); $stream = $stream->write($bytes => sub {...}); Enqueue data to be written to the stream as soon as possible, the optional drain callback will be executed once all data has been written. =head1 SEE ALSO L, L, L. =cut