mirror of https://github.com/nodejs/node.git
1588 lines
36 KiB
C
1588 lines
36 KiB
C
/* Copyright (c) 2008,2009 Ryan Dahl
|
|
*
|
|
* evcom_queue comes from ngx_queue.h
|
|
* Copyright (C) 2002-2009 Igor Sysoev
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
|
|
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
|
|
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
* SUCH DAMAGE.
|
|
*/
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <assert.h>
|
|
#include <unistd.h>
|
|
#include <fcntl.h>
|
|
#include <errno.h>
|
|
#include <string.h> /* memset */
|
|
|
|
#include <signal.h>
|
|
|
|
#include <netinet/tcp.h> /* TCP_NODELAY */
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h> /* shutdown */
|
|
#include <sys/un.h>
|
|
#include <netinet/in.h> /* sockaddr_in, sockaddr_in6 */
|
|
|
|
#include <ev.h>
|
|
#include <evcom.h>
|
|
|
|
#if EV_MULTIPLICITY
|
|
# define D_LOOP_(d) (d)->loop,
|
|
# define D_LOOP_SET(d, _loop) do { (d)->loop = (_loop); } while (0)
|
|
#else
|
|
# define D_LOOP_(d)
|
|
# define D_LOOP_SET(d, _loop)
|
|
#endif // EV_MULTIPLICITY
|
|
|
|
|
|
/* SEND STATES */
|
|
static int stream_send__wait_for_connection (evcom_stream*);
|
|
static int stream_send__data (evcom_stream*);
|
|
static int stream_send__drain (evcom_stream*);
|
|
static int stream_send__wait_for_eof (evcom_stream*);
|
|
static int stream_send__wait_for_buf (evcom_stream*);
|
|
static int stream_send__shutdown (evcom_stream*);
|
|
#if EVCOM_HAVE_GNUTLS
|
|
static int stream_send__gnutls_bye (evcom_stream*);
|
|
#endif
|
|
static int stream_send__close_one (evcom_stream*);
|
|
static int stream_send__close (evcom_stream*);
|
|
|
|
/* RECV STATES */
|
|
static int stream_recv__data (evcom_stream*);
|
|
static int stream_recv__wait_for_resume (evcom_stream*);
|
|
static int stream_recv__wait_for_close (evcom_stream*);
|
|
static int stream_recv__close_one (evcom_stream*);
|
|
static int stream_recv__close (evcom_stream*);
|
|
|
|
/* COMMON STATES */
|
|
#if EVCOM_HAVE_GNUTLS
|
|
static int stream__handshake (evcom_stream*);
|
|
#endif
|
|
static int stream__close_both (evcom_stream*);
|
|
|
|
#undef TRUE
|
|
#define TRUE 1
|
|
#undef FALSE
|
|
#define FALSE 0
|
|
#undef MIN
|
|
#define MIN(a,b) ((a) < (b) ? (a) : (b))
|
|
|
|
#define OKAY 0
|
|
#define AGAIN 1
|
|
|
|
#define ATTACHED(s) ((s)->flags & EVCOM_ATTACHED)
|
|
#define LISTENING(s) ((s)->flags & EVCOM_LISTENING)
|
|
#define CONNECTED(s) ((s)->flags & EVCOM_CONNECTED)
|
|
#define SECURE(s) ((s)->flags & EVCOM_SECURE)
|
|
#define DUPLEX(s) ((s)->flags & EVCOM_DUPLEX)
|
|
#define GOT_CLOSE(s) ((s)->flags & EVCOM_GOT_CLOSE)
|
|
#define PAUSED(s) ((s)->flags & EVCOM_PAUSED)
|
|
#define READABLE(s) ((s)->flags & EVCOM_READABLE)
|
|
#define WRITABLE(s) ((s)->flags & EVCOM_WRITABLE)
|
|
|
|
static int too_many_connections = 0;
|
|
|
|
EV_INLINE int
|
|
set_nonblock (int fd)
|
|
{
|
|
int flags = fcntl(fd, F_GETFL, 0);
|
|
if (flags == -1) return -1;
|
|
|
|
int r = fcntl(fd, F_SETFL, flags | O_NONBLOCK);
|
|
if (r == -1) return -1;
|
|
|
|
return 0;
|
|
}
|
|
|
|
evcom_buf *
|
|
evcom_buf_new2 (size_t len)
|
|
{
|
|
void *data = malloc(sizeof(evcom_buf) + len);
|
|
if (!data) return NULL;
|
|
|
|
evcom_buf *buf = data;
|
|
buf->len = len;
|
|
buf->release = (void (*)(evcom_buf*))free;
|
|
buf->base = data + sizeof(evcom_buf);
|
|
|
|
return buf;
|
|
}
|
|
|
|
evcom_buf *
|
|
evcom_buf_new (const char *base, size_t len)
|
|
{
|
|
evcom_buf* buf = evcom_buf_new2(len);
|
|
if (!buf) return NULL;
|
|
|
|
memcpy(buf->base, base, len);
|
|
|
|
return buf;
|
|
}
|
|
|
|
void evcom_ignore_sigpipe (void)
|
|
{
|
|
struct sigaction sa;
|
|
bzero(&sa, sizeof(sa));
|
|
sa.sa_handler = SIG_IGN;
|
|
sigaction(SIGPIPE, &sa, NULL);
|
|
}
|
|
|
|
static int
|
|
close_asap (evcom_descriptor *d)
|
|
{
|
|
if (d->fd < 0) return OKAY;
|
|
|
|
int r = close(d->fd);
|
|
|
|
if (r < 0) {
|
|
if (errno == EINTR) {
|
|
d->action = close_asap;
|
|
return AGAIN;
|
|
} else {
|
|
d->errorno = errno;
|
|
}
|
|
}
|
|
|
|
d->action = NULL;
|
|
d->fd = -1;
|
|
|
|
return OKAY;
|
|
}
|
|
|
|
#define release_write_buffer(writer) \
|
|
do { \
|
|
while (!evcom_queue_empty(&(writer)->out)) { \
|
|
evcom_queue *q = evcom_queue_last(&(writer)->out); \
|
|
evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue); \
|
|
evcom_queue_remove(q); \
|
|
if (buf->release) buf->release(buf); \
|
|
} \
|
|
} while (0)
|
|
|
|
static int
|
|
close_writer_asap (evcom_writer *writer)
|
|
{
|
|
release_write_buffer(writer);
|
|
ev_feed_event(D_LOOP_(writer) &writer->write_watcher, EV_WRITE);
|
|
return close_asap((evcom_descriptor*)writer);
|
|
}
|
|
|
|
static inline void
|
|
evcom_perror (const char *msg, int errorno)
|
|
{
|
|
fprintf(stderr, "(evcom) %s %s\n", msg, strerror(errorno));
|
|
}
|
|
|
|
static int
|
|
stream_send__wait_for_buf (evcom_stream *stream)
|
|
{
|
|
if (evcom_queue_empty(&stream->out)) {
|
|
if (GOT_CLOSE(stream)) {
|
|
stream->send_action = stream_send__drain;
|
|
return OKAY;
|
|
}
|
|
ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
|
|
return AGAIN;
|
|
}
|
|
|
|
stream->send_action = stream_send__data;
|
|
return OKAY;
|
|
}
|
|
|
|
static inline void
|
|
stream__set_recv_closed (evcom_stream *stream)
|
|
{
|
|
stream->flags &= ~EVCOM_READABLE;
|
|
stream->recvfd = -1;
|
|
stream->recv_action = NULL;
|
|
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
|
|
}
|
|
|
|
static inline void
|
|
stream__set_send_closed (evcom_stream *stream)
|
|
{
|
|
release_write_buffer(stream);
|
|
stream->flags &= ~EVCOM_WRITABLE;
|
|
stream->sendfd = -1;
|
|
stream->send_action = NULL;
|
|
ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
|
|
}
|
|
|
|
static int
|
|
stream_send__close_one (evcom_stream *stream)
|
|
{
|
|
assert(stream->sendfd >= 0);
|
|
|
|
close(stream->sendfd);
|
|
|
|
/* TODO recover from EINTR */
|
|
|
|
stream__set_send_closed(stream);
|
|
|
|
if (DUPLEX(stream) || stream->recv_action == stream_recv__wait_for_close) {
|
|
stream__set_recv_closed(stream);
|
|
}
|
|
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream__close_both (evcom_stream *stream)
|
|
{
|
|
assert(stream->sendfd != stream->recvfd);
|
|
|
|
assert(stream->sendfd >= 0);
|
|
assert(stream->recvfd >= 0);
|
|
|
|
close(stream->recvfd);
|
|
close(stream->sendfd);
|
|
|
|
/* TODO recover from EINTR */
|
|
|
|
stream__set_send_closed(stream);
|
|
stream__set_recv_closed(stream);
|
|
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream_send__close (evcom_stream *stream)
|
|
{
|
|
if (DUPLEX(stream) || stream->recvfd < 0) {
|
|
stream->send_action = stream_send__close_one;
|
|
} else {
|
|
stream->send_action = stream__close_both;
|
|
}
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream_recv__close_one (evcom_stream *stream)
|
|
{
|
|
assert(stream->recvfd >= 0);
|
|
|
|
close(stream->recvfd);
|
|
|
|
/* TODO recover from EINTR */
|
|
|
|
stream__set_recv_closed(stream);
|
|
|
|
if (DUPLEX(stream)) {
|
|
stream__set_send_closed(stream);
|
|
}
|
|
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream_recv__close (evcom_stream *stream)
|
|
{
|
|
if (DUPLEX(stream) || stream->sendfd < 0) {
|
|
stream->recv_action = stream_recv__close_one;
|
|
} else {
|
|
stream->recv_action = stream__close_both;
|
|
}
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream_send__drain (evcom_stream *stream)
|
|
{
|
|
if (stream->on_drain) {
|
|
stream->on_drain(stream);
|
|
}
|
|
|
|
if (!GOT_CLOSE(stream)) {
|
|
stream->send_action = stream_send__wait_for_buf;
|
|
return OKAY;
|
|
}
|
|
|
|
#if EVCOM_HAVE_GNUTLS
|
|
if (SECURE(stream)) {
|
|
stream->send_action = stream_send__gnutls_bye;
|
|
return OKAY;
|
|
}
|
|
#endif
|
|
|
|
if (DUPLEX(stream)) {
|
|
stream->send_action = stream_send__shutdown;
|
|
return OKAY;
|
|
}
|
|
|
|
stream->send_action = stream_send__close_one;
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream_send__wait_for_eof (evcom_stream *stream)
|
|
{
|
|
if (READABLE(stream)) {
|
|
ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
|
|
assert(stream->send_action == stream_send__wait_for_eof);
|
|
return AGAIN;
|
|
}
|
|
|
|
stream->send_action = stream_send__close_one;
|
|
return OKAY;
|
|
}
|
|
|
|
static inline ssize_t
|
|
nosigpipe_send (int fd, const void *buf, size_t len)
|
|
{
|
|
int flags = 0;
|
|
#ifdef MSG_NOSIGNAL
|
|
flags |= MSG_NOSIGNAL;
|
|
#endif
|
|
#ifdef MSG_DONTWAIT
|
|
flags |= MSG_DONTWAIT;
|
|
#endif
|
|
return send(fd, buf, len, flags);
|
|
}
|
|
|
|
static inline ssize_t
|
|
nosigpipe_stream_send (evcom_stream *stream, const void *buf, size_t len)
|
|
{
|
|
return write(stream->sendfd, buf, len);
|
|
}
|
|
|
|
#if EVCOM_HAVE_GNUTLS
|
|
static ssize_t
|
|
nosigpipe_push (gnutls_transport_ptr_t data, const void *buf, size_t len)
|
|
{
|
|
evcom_stream *stream = (evcom_stream*)data;
|
|
assert(SECURE(stream));
|
|
|
|
return nosigpipe_stream_send(stream, buf, len);
|
|
}
|
|
|
|
static ssize_t
|
|
pull (gnutls_transport_ptr_t data, void* buf, size_t len)
|
|
{
|
|
evcom_stream *stream = (evcom_stream*)data;
|
|
assert(SECURE(stream));
|
|
|
|
return read(stream->recvfd, buf, len);
|
|
}
|
|
|
|
static int
|
|
stream__handshake (evcom_stream *stream)
|
|
{
|
|
assert(SECURE(stream));
|
|
|
|
int r = gnutls_handshake(stream->session);
|
|
|
|
if (gnutls_error_is_fatal(r)) {
|
|
stream->gnutls_errorno = r;
|
|
stream->send_action = stream_send__close;
|
|
stream->recv_action = stream_recv__close;
|
|
return OKAY;
|
|
}
|
|
|
|
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
|
|
|
|
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) {
|
|
if (0 == gnutls_record_get_direction((stream)->session)) {
|
|
ev_io_start(D_LOOP_(stream) &(stream)->read_watcher);
|
|
ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher);
|
|
} else {
|
|
ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher);
|
|
ev_io_start(D_LOOP_(stream) &(stream)->write_watcher);
|
|
}
|
|
assert(stream->recv_action == stream__handshake);
|
|
assert(stream->send_action == stream__handshake);
|
|
return AGAIN;
|
|
}
|
|
|
|
assert(!CONNECTED(stream));
|
|
stream->flags |= EVCOM_CONNECTED;
|
|
if (stream->on_connect) stream->on_connect(stream);
|
|
|
|
/* evcom_stream_force_close might have been called. */
|
|
if (stream->recvfd >= 0 && stream->sendfd >= 0) {
|
|
ev_io_start(D_LOOP_(stream) &stream->read_watcher);
|
|
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
|
|
|
|
stream->send_action = stream_send__data;
|
|
stream->recv_action = stream_recv__data;
|
|
}
|
|
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream_send__gnutls_bye (evcom_stream *stream)
|
|
{
|
|
assert(SECURE(stream));
|
|
|
|
int r = gnutls_bye(stream->session, GNUTLS_SHUT_WR);
|
|
|
|
if (r == GNUTLS_E_INTERRUPTED || r == GNUTLS_E_AGAIN) {
|
|
assert(1 == gnutls_record_get_direction((stream)->session));
|
|
assert(stream->send_action == stream_send__gnutls_bye);
|
|
return AGAIN;
|
|
}
|
|
|
|
if (gnutls_error_is_fatal(r)) {
|
|
stream->gnutls_errorno = r;
|
|
stream->send_action = stream_send__close;
|
|
return OKAY;
|
|
}
|
|
|
|
stream->flags &= ~EVCOM_WRITABLE;
|
|
|
|
stream->send_action = stream_send__wait_for_eof;
|
|
return OKAY;
|
|
}
|
|
|
|
void
|
|
evcom_stream_set_secure_session (evcom_stream *stream, gnutls_session_t session)
|
|
{
|
|
stream->session = session;
|
|
stream->flags |= EVCOM_SECURE;
|
|
}
|
|
#endif /* HAVE GNUTLS */
|
|
|
|
static int
|
|
stream_recv__wait_for_close (evcom_stream *stream)
|
|
{
|
|
assert(!READABLE(stream));
|
|
|
|
if (!WRITABLE(stream)) {
|
|
stream->recv_action = stream_recv__close;
|
|
return OKAY;
|
|
}
|
|
|
|
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
|
|
return AGAIN;
|
|
}
|
|
|
|
static int
|
|
stream_recv__wait_for_resume (evcom_stream *stream)
|
|
{
|
|
stream->flags |= EVCOM_PAUSED;
|
|
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
|
|
assert(stream->recv_action == stream_recv__wait_for_resume);
|
|
return AGAIN;
|
|
}
|
|
|
|
static int
|
|
stream_recv__data (evcom_stream *stream)
|
|
{
|
|
char buf[EVCOM_CHUNKSIZE];
|
|
size_t buf_size = EVCOM_CHUNKSIZE;
|
|
ssize_t recved;
|
|
|
|
while (READABLE(stream)) {
|
|
assert(CONNECTED(stream));
|
|
|
|
if (PAUSED(stream)) {
|
|
stream->recv_action = stream_recv__wait_for_resume;
|
|
return OKAY;
|
|
}
|
|
|
|
|
|
#if EVCOM_HAVE_GNUTLS
|
|
if (SECURE(stream)) {
|
|
recved = gnutls_record_recv(stream->session, buf, buf_size);
|
|
|
|
if (gnutls_error_is_fatal(recved)) {
|
|
stream->gnutls_errorno = recved;
|
|
stream->recv_action = stream_recv__close;
|
|
return OKAY;
|
|
}
|
|
|
|
if (recved == GNUTLS_E_INTERRUPTED || recved == GNUTLS_E_AGAIN) {
|
|
if (1 == gnutls_record_get_direction((stream)->session)) {
|
|
fprintf(stderr, "(evcom) gnutls recv: unexpected switch direction!\n");
|
|
ev_io_stop(D_LOOP_(stream) &(stream)->read_watcher);
|
|
ev_io_start(D_LOOP_(stream) &(stream)->write_watcher);
|
|
}
|
|
return AGAIN;
|
|
}
|
|
|
|
/* A server may also receive GNUTLS_E_REHANDSHAKE when a client has
|
|
* initiated a andshake. In that case the server can only initiate a
|
|
* handshake or terminate the connection. */
|
|
if (recved == GNUTLS_E_REHANDSHAKE) {
|
|
assert(WRITABLE(stream));
|
|
stream->recv_action = stream__handshake;
|
|
stream->send_action = stream__handshake;
|
|
return OKAY;
|
|
}
|
|
} else
|
|
#endif /* EVCOM_HAVE_GNUTLS */
|
|
{
|
|
recved = read(stream->recvfd, buf, buf_size);
|
|
}
|
|
|
|
if (recved < 0) {
|
|
if (errno == EAGAIN || errno == EINTR) {
|
|
assert(stream->recv_action == stream_recv__data);
|
|
return AGAIN;
|
|
}
|
|
|
|
if (errno != ECONNRESET) {
|
|
evcom_perror("recv()", stream->errorno);
|
|
}
|
|
|
|
stream->errorno = errno;
|
|
stream->recv_action = stream_recv__close;
|
|
return OKAY;
|
|
}
|
|
|
|
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
|
|
|
|
assert(recved >= 0);
|
|
|
|
if (recved == 0) {
|
|
stream->flags &= ~EVCOM_READABLE;
|
|
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
|
|
stream->recv_action = stream_recv__wait_for_close;
|
|
}
|
|
|
|
/* NOTE: EOF is signaled with recved == 0 on callback */
|
|
if (stream->on_read) stream->on_read(stream, buf, recved);
|
|
|
|
if (recved == 0) {
|
|
return OKAY;
|
|
}
|
|
}
|
|
return AGAIN;
|
|
}
|
|
|
|
static int
|
|
stream_send__data (evcom_stream *stream)
|
|
{
|
|
ssize_t sent;
|
|
|
|
while (!evcom_queue_empty(&stream->out)) {
|
|
assert(WRITABLE(stream));
|
|
|
|
evcom_queue *q = evcom_queue_last(&stream->out);
|
|
evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue);
|
|
|
|
#if EVCOM_HAVE_GNUTLS
|
|
if (SECURE(stream)) {
|
|
sent = gnutls_record_send(stream->session,
|
|
buf->base + buf->written,
|
|
buf->len - buf->written);
|
|
|
|
if (sent == GNUTLS_E_INTERRUPTED || sent == GNUTLS_E_AGAIN) {
|
|
if (0 == gnutls_record_get_direction((stream)->session)) {
|
|
fprintf(stderr, "(evcom) gnutls send: unexpected switch direction!\n");
|
|
ev_io_start(D_LOOP_(stream) &(stream)->read_watcher);
|
|
ev_io_stop(D_LOOP_(stream) &(stream)->write_watcher);
|
|
}
|
|
return AGAIN;
|
|
}
|
|
|
|
if (gnutls_error_is_fatal(sent)) {
|
|
stream->gnutls_errorno = sent;
|
|
stream->send_action = stream_send__close;
|
|
return OKAY;
|
|
}
|
|
} else
|
|
#endif // EVCOM_HAVE_GNUTLS
|
|
{
|
|
/* TODO use writev() here? */
|
|
sent = nosigpipe_stream_send(stream,
|
|
buf->base + buf->written,
|
|
buf->len - buf->written);
|
|
}
|
|
|
|
if (sent <= 0) {
|
|
switch (errno) {
|
|
case EAGAIN:
|
|
case EINTR:
|
|
assert(stream->send_action == stream_send__data);
|
|
return AGAIN;
|
|
|
|
default:
|
|
stream->errorno = errno;
|
|
evcom_perror("send()", errno);
|
|
/* pass through */
|
|
case EPIPE:
|
|
stream->send_action = stream_send__close;
|
|
return OKAY;
|
|
}
|
|
}
|
|
|
|
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
|
|
|
|
assert(sent >= 0);
|
|
|
|
buf->written += sent;
|
|
|
|
if (buf->written == buf->len) {
|
|
evcom_queue_remove(q);
|
|
if (buf->release) buf->release(buf);
|
|
}
|
|
}
|
|
|
|
assert(evcom_queue_empty(&stream->out));
|
|
|
|
stream->send_action = stream_send__drain;
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream_send__shutdown (evcom_stream *stream)
|
|
{
|
|
int r = shutdown(stream->sendfd, SHUT_WR);
|
|
|
|
if (r < 0) {
|
|
switch (errno) {
|
|
case EINTR:
|
|
assert(stream->send_action == stream_send__shutdown);
|
|
return OKAY;
|
|
|
|
case ENOTCONN:
|
|
break;
|
|
|
|
default:
|
|
stream->errorno = errno;
|
|
evcom_perror("shutdown()", errno);
|
|
break;
|
|
}
|
|
stream->send_action = stream_send__close;
|
|
return OKAY;
|
|
}
|
|
|
|
stream->flags &= ~EVCOM_WRITABLE;
|
|
stream->send_action = stream_send__wait_for_eof;
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream__connection_established (evcom_stream *stream)
|
|
{
|
|
assert(!CONNECTED(stream));
|
|
|
|
#if EVCOM_HAVE_GNUTLS
|
|
if (SECURE(stream)) {
|
|
stream->send_action = stream__handshake;
|
|
stream->recv_action = stream__handshake;
|
|
} else
|
|
#endif
|
|
{
|
|
stream->flags |= EVCOM_CONNECTED;
|
|
if (stream->on_connect) stream->on_connect(stream);
|
|
|
|
stream->send_action = stream_send__data;
|
|
stream->recv_action = stream_recv__data;
|
|
}
|
|
|
|
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
|
|
ev_io_start(D_LOOP_(stream) &stream->read_watcher);
|
|
|
|
return OKAY;
|
|
}
|
|
|
|
static int
|
|
stream_send__wait_for_connection (evcom_stream *stream)
|
|
{
|
|
assert(DUPLEX(stream));
|
|
|
|
int connect_error;
|
|
socklen_t len = sizeof(int);
|
|
int r = getsockopt(stream->sendfd, SOL_SOCKET, SO_ERROR, &connect_error, &len);
|
|
|
|
if (r < 0) {
|
|
stream->errorno = r;
|
|
stream->send_action = stream_send__close;
|
|
return OKAY;
|
|
}
|
|
|
|
if (connect_error == 0) {
|
|
stream->send_action = stream__connection_established;
|
|
return OKAY;
|
|
|
|
} else if (connect_error == EINPROGRESS || connect_error == EINTR) {
|
|
assert(stream->send_action == stream_send__wait_for_connection);
|
|
return AGAIN;
|
|
}
|
|
|
|
stream->errorno = connect_error;
|
|
stream->send_action = stream_send__close;
|
|
return OKAY;
|
|
}
|
|
|
|
void
|
|
evcom_stream_assign_fds (evcom_stream *stream, int recvfd, int sendfd)
|
|
{
|
|
assert(recvfd >= 0);
|
|
assert(sendfd >= 0);
|
|
|
|
if (recvfd == sendfd) stream->flags |= EVCOM_DUPLEX;
|
|
|
|
if (set_nonblock(recvfd) != 0) {
|
|
evcom_perror("set_nonblock(recvfd)", errno);
|
|
}
|
|
|
|
if (set_nonblock(sendfd) != 0) {
|
|
evcom_perror("set_nonblock(sendfd)", errno);
|
|
}
|
|
|
|
#ifdef SO_NOSIGPIPE
|
|
if (DUPLEX(stream)) {
|
|
int flags = 1;
|
|
int r = setsockopt(sendfd, SOL_SOCKET, SO_NOSIGPIPE, &flags, sizeof(flags));
|
|
if (r < 0) {
|
|
evcom_perror("setsockopt(SO_NOSIGPIPE)", errno);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
ev_io_set(&stream->read_watcher, recvfd, EV_READ);
|
|
ev_io_set(&stream->write_watcher, sendfd, EV_WRITE);
|
|
|
|
stream->recvfd = recvfd;
|
|
stream->sendfd = sendfd;
|
|
|
|
stream->send_action = stream__connection_established;
|
|
stream->recv_action = stream__connection_established;
|
|
|
|
stream->flags |= EVCOM_READABLE;
|
|
stream->flags |= EVCOM_WRITABLE;
|
|
|
|
#if EVCOM_HAVE_GNUTLS
|
|
if (SECURE(stream)) {
|
|
gnutls_transport_set_lowat(stream->session, 0);
|
|
gnutls_transport_set_push_function(stream->session, nosigpipe_push);
|
|
gnutls_transport_set_pull_function(stream->session, pull);
|
|
gnutls_transport_set_ptr2(stream->session, stream, stream);
|
|
}
|
|
#endif
|
|
}
|
|
|
|
/* Retruns evcom_stream if a connection could be accepted.
|
|
* The returned stream is not yet attached to the event loop.
|
|
* Otherwise NULL
|
|
*/
|
|
static evcom_stream*
|
|
accept_connection (evcom_server *server)
|
|
{
|
|
struct sockaddr address; /* connector's address information */
|
|
socklen_t addr_len = sizeof(address);
|
|
|
|
int fd = accept(server->fd, &address, &addr_len);
|
|
if (fd < 0) {
|
|
switch (errno) {
|
|
case EMFILE:
|
|
case ENFILE:
|
|
too_many_connections = 1;
|
|
server->flags |= EVCOM_TOO_MANY_CONN;
|
|
evcom_server_detach(server);
|
|
return NULL;
|
|
|
|
case EINTR:
|
|
case EAGAIN:
|
|
return NULL;
|
|
|
|
default:
|
|
evcom_perror("accept()", errno);
|
|
return NULL;
|
|
}
|
|
assert(0 && "no reach");
|
|
}
|
|
|
|
evcom_stream *stream = NULL;
|
|
|
|
if (server->on_connection) {
|
|
stream = server->on_connection(server, &address);
|
|
}
|
|
|
|
if (stream == NULL) {
|
|
close(fd);
|
|
return NULL;
|
|
}
|
|
|
|
if (set_nonblock(fd) != 0) {
|
|
evcom_perror("set_nonblock()", errno);
|
|
return NULL;
|
|
}
|
|
|
|
stream->server = server;
|
|
evcom_stream_assign_fds(stream, fd, fd);
|
|
|
|
return stream;
|
|
}
|
|
|
|
/* Internal callback
|
|
* Called by server->watcher.
|
|
*/
|
|
static int
|
|
accept_connections (evcom_descriptor *d)
|
|
{
|
|
evcom_server *server = (evcom_server *)d;
|
|
|
|
assert(LISTENING(server));
|
|
|
|
/* accept as many connections as possible */
|
|
evcom_stream *stream;
|
|
while (server->fd >= 0 && (stream = accept_connection(server))) {
|
|
evcom_stream_attach(D_LOOP_(server) stream);
|
|
}
|
|
|
|
return AGAIN;
|
|
}
|
|
|
|
static inline socklen_t
|
|
address_length (struct sockaddr *address)
|
|
{
|
|
struct sockaddr_un* unix_address = (struct sockaddr_un*)address;
|
|
int length = 0;
|
|
|
|
switch (address->sa_family) {
|
|
case AF_INET:
|
|
length = sizeof(struct sockaddr_in);
|
|
break;
|
|
case AF_INET6:
|
|
length = sizeof(struct sockaddr_in6);
|
|
break;
|
|
case AF_UNIX:
|
|
#ifdef SUN_LEN
|
|
length = SUN_LEN(unix_address);
|
|
#else
|
|
length = strlen(unix_address->sun_path) + sizeof(unix_address->sun_family);
|
|
#endif
|
|
break;
|
|
|
|
default:
|
|
assert(0 && "Unsupported socket family");
|
|
}
|
|
return length;
|
|
}
|
|
|
|
int
|
|
evcom_server_listen (evcom_server *server, struct sockaddr *address, int backlog)
|
|
{
|
|
assert(!LISTENING(server));
|
|
|
|
int fd = socket(address->sa_family, SOCK_STREAM, 0);
|
|
if (fd < 0) {
|
|
server->errorno = errno;
|
|
evcom_perror("socket()", errno);
|
|
return -1;
|
|
}
|
|
|
|
server->fd = fd;
|
|
ev_io_set(&server->watcher, server->fd, EV_READ);
|
|
|
|
if (set_nonblock(fd) != 0) {
|
|
server->errorno = errno;
|
|
evcom_perror("set_nonblock()", errno);
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
int flags = 1;
|
|
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
|
|
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
|
|
|
|
/* XXX: Sending single byte chunks in a response body? Perhaps there is a
|
|
* need to enable the Nagel algorithm dynamically. For now disabling.
|
|
*/
|
|
//setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
|
|
|
|
if (bind(fd, address, address_length(address)) < 0) {
|
|
server->errorno = errno;
|
|
evcom_perror("bind()", errno);
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
if (listen(fd, backlog) < 0) {
|
|
server->errorno = errno;
|
|
evcom_perror("listen()", errno);
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
server->flags |= EVCOM_LISTENING;
|
|
server->action = accept_connections;
|
|
|
|
return 0;
|
|
}
|
|
|
|
/**
|
|
* Stops the server. Will not accept new connections. Does not drop
|
|
* existing connections.
|
|
*/
|
|
void
|
|
evcom_server_close (evcom_server *server)
|
|
{
|
|
ev_io_start(D_LOOP_(server) &server->watcher);
|
|
ev_feed_event(D_LOOP_(server) &server->watcher, EV_READ);
|
|
|
|
close_asap((evcom_descriptor*)server);
|
|
}
|
|
|
|
void
|
|
evcom_server_attach (EV_P_ evcom_server *server)
|
|
{
|
|
ev_io_start (EV_A_ &server->watcher);
|
|
D_LOOP_SET(server, EV_A);
|
|
server->flags |= EVCOM_ATTACHED;
|
|
}
|
|
|
|
void
|
|
evcom_server_detach (evcom_server *server)
|
|
{
|
|
ev_io_stop (D_LOOP_(server) &server->watcher);
|
|
D_LOOP_SET(server, NULL);
|
|
server->flags &= ~EVCOM_ATTACHED;
|
|
}
|
|
|
|
static void
|
|
io_event(EV_P_ ev_io *watcher, int revents)
|
|
{
|
|
evcom_descriptor *d = watcher->data;
|
|
|
|
#if EV_MULTIPLICITY
|
|
assert(d->loop == loop);
|
|
#endif
|
|
|
|
int r = OKAY;
|
|
|
|
if (revents & EV_ERROR) {
|
|
d->errorno = 1;
|
|
r = close_asap(d);
|
|
}
|
|
|
|
while (r == OKAY && d->action && d->fd >= 0) {
|
|
r = d->action(d);
|
|
}
|
|
|
|
if (d->fd < 0) {
|
|
assert(d->action == NULL);
|
|
ev_clear_pending(EV_A_ watcher);
|
|
ev_io_stop(EV_A_ watcher);
|
|
if (d->on_close) d->on_close(d);
|
|
}
|
|
}
|
|
|
|
static void
|
|
evcom_descriptor_init (evcom_descriptor *d)
|
|
{
|
|
d->fd = -1;
|
|
D_LOOP_SET(d, NULL);
|
|
d->flags = 0;
|
|
d->errorno = 0;
|
|
d->action = NULL;
|
|
}
|
|
|
|
void
|
|
evcom_server_init (evcom_server *server)
|
|
{
|
|
evcom_descriptor_init((evcom_descriptor*)server);
|
|
ev_init (&server->watcher, io_event);
|
|
server->watcher.data = server;
|
|
server->on_connection = NULL;
|
|
}
|
|
|
|
/* Internal callback. called by stream->timeout_watcher */
|
|
static void
|
|
on_timeout (EV_P_ ev_timer *watcher, int revents)
|
|
{
|
|
evcom_stream *stream = watcher->data;
|
|
|
|
#if EV_MULTIPLICITY
|
|
assert(stream->loop == loop);
|
|
#endif
|
|
assert(revents == EV_TIMEOUT);
|
|
assert(watcher == &stream->timeout_watcher);
|
|
|
|
if (PAUSED(stream)) {
|
|
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
|
|
return;
|
|
}
|
|
|
|
if (stream->on_timeout) stream->on_timeout(stream);
|
|
|
|
// Hack to get error in Node on 'close' event.
|
|
// should probably be made into a proper error code.
|
|
stream->errorno = 1;
|
|
|
|
ev_timer_stop(EV_A_ watcher);
|
|
evcom_stream_force_close(stream);
|
|
|
|
if (stream->on_close) stream->on_close(stream);
|
|
}
|
|
|
|
static void
|
|
stream_event (EV_P_ ev_io *w, int revents)
|
|
{
|
|
evcom_stream *stream = w->data;
|
|
|
|
if (revents & EV_READ) {
|
|
while (stream->recv_action) {
|
|
int r = stream->recv_action(stream);
|
|
if (r == AGAIN) break;
|
|
}
|
|
}
|
|
|
|
if (revents & EV_WRITE) {
|
|
while (stream->send_action) {
|
|
int r = stream->send_action(stream);
|
|
if (r == AGAIN) break;
|
|
}
|
|
}
|
|
|
|
if (stream->send_action == NULL) {
|
|
ev_io_stop(EV_A_ &stream->write_watcher);
|
|
}
|
|
|
|
if (stream->recv_action == NULL) {
|
|
ev_io_stop(EV_A_ &stream->read_watcher);
|
|
}
|
|
|
|
if (stream->sendfd < 0 && stream->recvfd < 0) {
|
|
ev_timer_stop(EV_A_ &stream->timeout_watcher);
|
|
|
|
if (stream->server && (stream->server->flags & EVCOM_TOO_MANY_CONN)) {
|
|
#if EV_MULTIPLICITY
|
|
struct ev_loop *loop = stream->server->loop;
|
|
#endif
|
|
stream->server->flags &= ~EVCOM_TOO_MANY_CONN;
|
|
evcom_server_attach(EV_A_ stream->server);
|
|
}
|
|
too_many_connections = 0;
|
|
|
|
if (stream->on_close) stream->on_close(stream);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* If using SSL do consider setting
|
|
* gnutls_db_set_retrieve_function (stream->session, _);
|
|
* gnutls_db_set_remove_function (stream->session, _);
|
|
* gnutls_db_set_store_function (stream->session, _);
|
|
* gnutls_db_set_ptr (stream->session, _);
|
|
*/
|
|
void
|
|
evcom_stream_init (evcom_stream *stream)
|
|
{
|
|
stream->flags = 0;
|
|
stream->errorno = 0;
|
|
stream->recvfd = -1;
|
|
stream->sendfd = -1;
|
|
|
|
// reader things
|
|
ev_init(&stream->read_watcher, stream_event);
|
|
stream->read_watcher.data = stream;
|
|
stream->recv_action = NULL;
|
|
|
|
// writer things
|
|
ev_init(&stream->write_watcher, stream_event);
|
|
stream->write_watcher.data = stream;
|
|
evcom_queue_init(&stream->out);
|
|
stream->send_action = NULL;
|
|
|
|
// stream things
|
|
stream->server = NULL;
|
|
#if EVCOM_HAVE_GNUTLS
|
|
stream->gnutls_errorno = 0;
|
|
stream->session = NULL;
|
|
#endif
|
|
ev_timer_init(&stream->timeout_watcher, on_timeout, 0., 60.);
|
|
stream->timeout_watcher.data = stream;
|
|
|
|
stream->on_connect = NULL;
|
|
stream->on_timeout = NULL;
|
|
stream->on_read = NULL;
|
|
stream->on_drain = NULL;
|
|
stream->on_close = NULL;
|
|
}
|
|
|
|
void
|
|
evcom_stream_close (evcom_stream *stream)
|
|
{
|
|
stream->flags |= EVCOM_GOT_CLOSE;
|
|
if (ATTACHED(stream)) {
|
|
// start the watchers if attached.
|
|
evcom_stream_attach(D_LOOP_(stream) stream);
|
|
}
|
|
}
|
|
|
|
void evcom_stream_force_close (evcom_stream *stream)
|
|
{
|
|
if (stream->recvfd >= 0) {
|
|
close(stream->recvfd);
|
|
/* XXX What to do on EINTR? */
|
|
stream__set_recv_closed(stream);
|
|
}
|
|
|
|
if (!DUPLEX(stream) && stream->sendfd >= 0) {
|
|
close(stream->sendfd);
|
|
}
|
|
stream__set_send_closed(stream);
|
|
|
|
evcom_stream_detach(stream);
|
|
}
|
|
|
|
void
|
|
evcom_stream_write (evcom_stream *stream, const char *str, size_t len)
|
|
{
|
|
if (!WRITABLE(stream) || GOT_CLOSE(stream)) {
|
|
assert(0 && "Do not write to a closed stream");
|
|
return;
|
|
}
|
|
|
|
ssize_t sent = 0;
|
|
|
|
if ( stream->send_action == stream_send__wait_for_buf
|
|
&& evcom_queue_empty(&stream->out)
|
|
)
|
|
{
|
|
assert(CONNECTED(stream));
|
|
#if EVCOM_HAVE_GNUTLS
|
|
if (SECURE(stream)) {
|
|
sent = gnutls_record_send(stream->session, str, len);
|
|
|
|
if (gnutls_error_is_fatal(sent)) {
|
|
stream->gnutls_errorno = sent;
|
|
goto close;
|
|
}
|
|
} else
|
|
#endif // EVCOM_HAVE_GNUTLS
|
|
{
|
|
/* TODO use writev() here? */
|
|
sent = nosigpipe_stream_send(stream, str, len);
|
|
}
|
|
|
|
if (sent < 0) {
|
|
switch (errno) {
|
|
case EPIPE:
|
|
goto close;
|
|
|
|
case EINTR:
|
|
case EAGAIN:
|
|
sent = 0;
|
|
break;
|
|
|
|
default:
|
|
stream->errorno = errno;
|
|
evcom_perror("send()", stream->errorno);
|
|
goto close;
|
|
}
|
|
}
|
|
} /* TODO else { memcpy to last buffer on head } */
|
|
|
|
assert(sent >= 0);
|
|
if ((size_t)sent == len) return; /* sent the whole buffer */
|
|
|
|
len -= sent;
|
|
str += sent;
|
|
|
|
evcom_buf *b = evcom_buf_new(str, len);
|
|
evcom_queue_insert_head(&stream->out, &b->queue);
|
|
b->written = 0;
|
|
|
|
assert(stream->sendfd >= 0);
|
|
|
|
if (ATTACHED(stream)) {
|
|
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
|
|
}
|
|
return;
|
|
|
|
close:
|
|
stream->send_action = stream_send__close;
|
|
stream->recv_action = stream_recv__close;
|
|
if (ATTACHED(stream)) {
|
|
ev_io_start(D_LOOP_(stream) &stream->write_watcher);
|
|
}
|
|
}
|
|
|
|
void
|
|
evcom_stream_reset_timeout (evcom_stream *stream, float timeout)
|
|
{
|
|
stream->timeout_watcher.repeat = timeout;
|
|
if (ATTACHED(stream)) {
|
|
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
|
|
}
|
|
}
|
|
|
|
void
|
|
evcom_stream_set_no_delay (evcom_stream *stream, int no_delay)
|
|
{
|
|
if (DUPLEX(stream)) {
|
|
int flags = no_delay ? 1 : 0;
|
|
setsockopt(stream->recvfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
|
|
}
|
|
}
|
|
|
|
void
|
|
evcom_stream_attach (EV_P_ evcom_stream *stream)
|
|
{
|
|
D_LOOP_SET(stream, EV_A);
|
|
stream->flags |= EVCOM_ATTACHED;
|
|
|
|
ev_timer_again(EV_A_ &stream->timeout_watcher);
|
|
|
|
if (READABLE(stream)) {
|
|
ev_io_start(EV_A_ &stream->read_watcher);
|
|
}
|
|
|
|
if (WRITABLE(stream)) {
|
|
ev_io_start(EV_A_ &stream->write_watcher);
|
|
}
|
|
}
|
|
|
|
void
|
|
evcom_stream_detach (evcom_stream *stream)
|
|
{
|
|
ev_io_stop(D_LOOP_(stream) &stream->write_watcher);
|
|
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
|
|
ev_timer_stop(D_LOOP_(stream) &stream->timeout_watcher);
|
|
D_LOOP_SET(stream, NULL);
|
|
stream->flags &= ~EVCOM_ATTACHED;
|
|
}
|
|
|
|
void
|
|
evcom_stream_read_pause (evcom_stream *stream)
|
|
{
|
|
stream->flags |= EVCOM_PAUSED;
|
|
ev_timer_stop(D_LOOP_(stream) &stream->timeout_watcher);
|
|
if (stream->recv_action == stream_recv__data) {
|
|
ev_io_stop(D_LOOP_(stream) &stream->read_watcher);
|
|
stream->recv_action = stream_recv__wait_for_resume;
|
|
}
|
|
}
|
|
|
|
void
|
|
evcom_stream_read_resume (evcom_stream *stream)
|
|
{
|
|
stream->flags &= ~EVCOM_PAUSED;
|
|
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
|
|
if (stream->recv_action == stream_recv__wait_for_resume) {
|
|
stream->recv_action = stream_recv__data;
|
|
}
|
|
if (ATTACHED(stream)) {
|
|
ev_timer_again(D_LOOP_(stream) &stream->timeout_watcher);
|
|
if (READABLE(stream)) ev_io_start(D_LOOP_(stream) &stream->read_watcher);
|
|
}
|
|
}
|
|
|
|
int
|
|
evcom_stream_connect (evcom_stream *stream, struct sockaddr *address)
|
|
{
|
|
int fd = socket(address->sa_family, SOCK_STREAM, 0);
|
|
|
|
if (fd < 0) {
|
|
evcom_perror("socket()", errno);
|
|
return -1;
|
|
}
|
|
|
|
int r = set_nonblock(fd);
|
|
if (r < 0) {
|
|
stream->errorno = errno;
|
|
evcom_perror("set_nonblock()", errno);
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
r = connect(fd, address, address_length(address));
|
|
|
|
if (r < 0 && errno != EINPROGRESS) {
|
|
stream->errorno = errno;
|
|
evcom_perror("connect", errno);
|
|
close(fd);
|
|
return -1;
|
|
}
|
|
|
|
evcom_stream_assign_fds(stream, fd, fd);
|
|
|
|
stream->send_action = stream_send__wait_for_connection;
|
|
stream->recv_action = NULL;
|
|
|
|
return 0;
|
|
}
|
|
|
|
int evcom_stream_pair (evcom_stream *a, evcom_stream *b)
|
|
{
|
|
int sv[2];
|
|
int old_errno;
|
|
|
|
int r = socketpair(PF_UNIX, SOCK_STREAM, 0, sv);
|
|
if (r < 0) return -1;
|
|
|
|
r = set_nonblock(sv[0]);
|
|
if (r < 0) goto set_nonblock_error;
|
|
r = set_nonblock(sv[1]);
|
|
if (r < 0) goto set_nonblock_error;
|
|
|
|
evcom_stream_assign_fds(a, sv[0], sv[0]);
|
|
evcom_stream_assign_fds(b, sv[1], sv[1]);
|
|
|
|
return 0;
|
|
|
|
set_nonblock_error:
|
|
old_errno = errno;
|
|
evcom_perror("set_nonblock()", errno);
|
|
close(sv[0]);
|
|
close(sv[1]);
|
|
errno = old_errno;
|
|
return -1;
|
|
}
|
|
|
|
enum evcom_stream_state
|
|
evcom_stream_state (evcom_stream *stream)
|
|
{
|
|
if (stream->recvfd < 0 && stream->sendfd && stream->flags == 0) {
|
|
return EVCOM_INITIALIZED;
|
|
}
|
|
|
|
if (stream->recvfd < 0 && stream->sendfd < 0) return EVCOM_CLOSED;
|
|
|
|
if (!CONNECTED(stream)) return EVCOM_CONNECTING;
|
|
|
|
if (GOT_CLOSE(stream)) {
|
|
if (READABLE(stream)) {
|
|
return EVCOM_CONNECTED_RO;
|
|
} else {
|
|
return EVCOM_CLOSING;
|
|
}
|
|
}
|
|
|
|
if (READABLE(stream) && WRITABLE(stream)) return EVCOM_CONNECTED_RW;
|
|
|
|
if (WRITABLE(stream)) return EVCOM_CONNECTED_WO;
|
|
|
|
if (READABLE(stream)) return EVCOM_CONNECTED_RO;
|
|
|
|
return EVCOM_CLOSING;
|
|
}
|
|
|
|
static int
|
|
reader_recv (evcom_descriptor *d)
|
|
{
|
|
evcom_reader* reader = (evcom_reader*) d;
|
|
|
|
char buf[EVCOM_CHUNKSIZE];
|
|
size_t buf_size = EVCOM_CHUNKSIZE;
|
|
ssize_t recved;
|
|
|
|
while (reader->fd >= 0) {
|
|
recved = read(reader->fd, buf, buf_size);
|
|
|
|
if (recved < 0) {
|
|
if (errno == EAGAIN || errno == EINTR) return AGAIN;
|
|
reader->errorno = errno;
|
|
evcom_perror("read()", reader->errorno);
|
|
return close_asap(d);
|
|
}
|
|
|
|
/* NOTE: EOF is signaled with recved == 0 on callback */
|
|
if (reader->on_read) reader->on_read(reader, buf, recved);
|
|
|
|
if (recved == 0) return close_asap(d);
|
|
}
|
|
return AGAIN;
|
|
}
|
|
|
|
void
|
|
evcom_reader_init (evcom_reader *reader)
|
|
{
|
|
evcom_descriptor_init((evcom_descriptor*)reader);
|
|
|
|
reader->on_close = NULL;
|
|
reader->on_read = NULL;
|
|
|
|
ev_init(&reader->read_watcher, io_event);
|
|
reader->read_watcher.data = reader;
|
|
}
|
|
|
|
void
|
|
evcom_reader_set (evcom_reader *reader, int fd)
|
|
{
|
|
assert(fd >= 0);
|
|
reader->fd = fd;
|
|
|
|
ev_io_set(&reader->read_watcher, fd, EV_READ);
|
|
reader->action = reader_recv;
|
|
}
|
|
|
|
void
|
|
evcom_reader_attach (EV_P_ evcom_reader *reader)
|
|
{
|
|
ev_io_start(EV_A_ &reader->read_watcher);
|
|
D_LOOP_SET(reader, EV_A);
|
|
}
|
|
|
|
void
|
|
evcom_reader_detach (evcom_reader *reader)
|
|
{
|
|
ev_io_stop(D_LOOP_(reader) &reader->read_watcher);
|
|
D_LOOP_SET(reader, NULL);
|
|
}
|
|
|
|
void
|
|
evcom_reader_close (evcom_reader *reader)
|
|
{
|
|
ev_io_start(D_LOOP_(reader) &reader->read_watcher);
|
|
ev_feed_event(D_LOOP_(reader) &reader->read_watcher, EV_READ);
|
|
|
|
close_asap((evcom_descriptor*)reader);
|
|
}
|
|
|
|
static int
|
|
writer_send (evcom_descriptor *d)
|
|
{
|
|
evcom_writer* writer = (evcom_writer*) d;
|
|
assert(writer->fd >= 0);
|
|
|
|
while (!evcom_queue_empty(&writer->out)) {
|
|
evcom_queue *q = evcom_queue_last(&writer->out);
|
|
evcom_buf *buf = evcom_queue_data(q, evcom_buf, queue);
|
|
|
|
ssize_t sent = write(writer->fd, buf->base + buf->written,
|
|
buf->len - buf->written);
|
|
|
|
if (sent < 0) {
|
|
switch (errno) {
|
|
case ECONNRESET:
|
|
case EPIPE:
|
|
return close_writer_asap(writer);
|
|
|
|
case EINTR:
|
|
case EAGAIN:
|
|
sent = 0;
|
|
return AGAIN;
|
|
|
|
default:
|
|
writer->errorno = errno;
|
|
evcom_perror("send()", writer->errorno);
|
|
return close_writer_asap(writer);
|
|
}
|
|
}
|
|
assert(sent >= 0);
|
|
|
|
buf->written += sent;
|
|
|
|
if (buf->written == buf->len) {
|
|
evcom_queue_remove(q);
|
|
if (buf->release) buf->release(buf);
|
|
}
|
|
}
|
|
|
|
if (GOT_CLOSE(writer)) {
|
|
assert(evcom_queue_empty(&writer->out));
|
|
return close_writer_asap(writer);
|
|
} else {
|
|
ev_io_stop(D_LOOP_(writer) &writer->write_watcher);
|
|
return AGAIN;
|
|
}
|
|
}
|
|
|
|
void
|
|
evcom_writer_init (evcom_writer* writer)
|
|
{
|
|
evcom_descriptor_init((evcom_descriptor*)writer);
|
|
|
|
writer->on_close = NULL;
|
|
|
|
ev_init(&writer->write_watcher, io_event);
|
|
writer->write_watcher.data = writer;
|
|
|
|
evcom_queue_init(&writer->out);
|
|
}
|
|
|
|
void
|
|
evcom_writer_set (evcom_writer* writer, int fd)
|
|
{
|
|
assert(fd >= 0);
|
|
writer->fd = fd;
|
|
|
|
ev_io_set(&writer->write_watcher, fd, EV_WRITE);
|
|
writer->action = writer_send;
|
|
}
|
|
|
|
void
|
|
evcom_writer_attach (EV_P_ evcom_writer* writer)
|
|
{
|
|
if (!evcom_queue_empty(&writer->out)) {
|
|
ev_io_start (EV_A_ &writer->write_watcher);
|
|
}
|
|
D_LOOP_SET(writer, EV_A);
|
|
}
|
|
|
|
void
|
|
evcom_writer_detach (evcom_writer* writer)
|
|
{
|
|
ev_io_stop(D_LOOP_(writer) &writer->write_watcher);
|
|
D_LOOP_SET(writer, NULL);
|
|
}
|
|
|
|
void
|
|
evcom_writer_write (evcom_writer* writer, const char* buf, size_t len)
|
|
{
|
|
assert(writer->fd >= 0);
|
|
|
|
ssize_t sent = 0;
|
|
|
|
if (evcom_queue_empty(&writer->out)) {
|
|
sent = write(writer->fd, buf, len);
|
|
|
|
if (sent < 0) {
|
|
switch (errno) {
|
|
case ECONNRESET:
|
|
case EPIPE:
|
|
goto close;
|
|
|
|
case EINTR:
|
|
case EAGAIN:
|
|
sent = 0;
|
|
break;
|
|
|
|
default:
|
|
writer->errorno = errno;
|
|
evcom_perror("send()", writer->errorno);
|
|
goto close;
|
|
}
|
|
}
|
|
} /* TODO else { memcpy to last buffer on head } */
|
|
|
|
assert(sent >= 0);
|
|
if ((size_t)sent == len) return; /* sent the whole buffer */
|
|
|
|
len -= sent;
|
|
buf += sent;
|
|
|
|
evcom_buf *b = evcom_buf_new(buf, len);
|
|
evcom_queue_insert_head(&writer->out, &b->queue);
|
|
b->written = 0;
|
|
|
|
assert(writer->fd >= 0);
|
|
ev_io_start(D_LOOP_(writer) &writer->write_watcher);
|
|
return;
|
|
|
|
close:
|
|
close_writer_asap(writer);
|
|
}
|
|
|
|
void
|
|
evcom_writer_close (evcom_writer* writer)
|
|
{
|
|
writer->flags |= EVCOM_GOT_CLOSE;
|
|
if (evcom_queue_empty(&writer->out)) close_writer_asap(writer);
|
|
}
|