@ -281,6 +281,7 @@ static void start_tls_p3( conn_t *conn, int ok )
# endif /* HAVE_LIBSSL */
# endif /* HAVE_LIBSSL */
static void socket_fd_cb ( int , void * ) ;
static void socket_fd_cb ( int , void * ) ;
static void socket_fake_cb ( void * ) ;
static void socket_connect_one ( conn_t * ) ;
static void socket_connect_one ( conn_t * ) ;
static void socket_connect_failed ( conn_t * ) ;
static void socket_connect_failed ( conn_t * ) ;
@ -293,12 +294,14 @@ socket_open_internal( conn_t *sock, int fd )
sock - > fd = fd ;
sock - > fd = fd ;
fcntl ( fd , F_SETFL , O_NONBLOCK ) ;
fcntl ( fd , F_SETFL , O_NONBLOCK ) ;
init_notifier ( & sock - > notify , fd , socket_fd_cb , sock ) ;
init_notifier ( & sock - > notify , fd , socket_fd_cb , sock ) ;
init_wakeup ( & sock - > fd_fake , socket_fake_cb , sock ) ;
}
}
static void
static void
socket_close_internal ( conn_t * sock )
socket_close_internal ( conn_t * sock )
{
{
wipe_notifier ( & sock - > notify ) ;
wipe_notifier ( & sock - > notify ) ;
wipe_wakeup ( & sock - > fd_fake ) ;
close ( sock - > fd ) ;
close ( sock - > fd ) ;
sock - > fd = - 1 ;
sock - > fd = - 1 ;
}
}
@ -500,6 +503,8 @@ socket_close( conn_t *sock )
# endif
# endif
while ( sock - > write_buf )
while ( sock - > write_buf )
dispose_chunk ( sock ) ;
dispose_chunk ( sock ) ;
free ( sock - > append_buf ) ;
sock - > append_buf = 0 ;
}
}
static void
static void
@ -609,8 +614,6 @@ dispose_chunk( conn_t *conn )
buff_chunk_t * bc = conn - > write_buf ;
buff_chunk_t * bc = conn - > write_buf ;
if ( ! ( conn - > write_buf = bc - > next ) )
if ( ! ( conn - > write_buf = bc - > next ) )
conn - > write_buf_append = & conn - > write_buf ;
conn - > write_buf_append = & conn - > write_buf ;
if ( bc - > data ! = bc - > buf )
free ( bc - > data ) ;
free ( bc ) ;
free ( bc ) ;
}
}
@ -641,48 +644,69 @@ do_queued_write( conn_t *conn )
}
}
static void
static void
do_append ( conn_t * conn , char * buf , int len , ownership_t takeOwn )
do_append ( conn_t * conn , buff_chunk_t * bc )
{
{
buff_chunk_t * bc ;
if ( takeOwn = = GiveOwn ) {
bc = nfmalloc ( offsetof ( buff_chunk_t , buf ) ) ;
bc - > data = buf ;
} else {
bc = nfmalloc ( offsetof ( buff_chunk_t , buf ) + len ) ;
bc - > data = bc - > buf ;
memcpy ( bc - > data , buf , len ) ;
}
bc - > len = len ;
bc - > next = 0 ;
bc - > next = 0 ;
* conn - > write_buf_append = bc ;
* conn - > write_buf_append = bc ;
conn - > write_buf_append = & bc - > next ;
conn - > write_buf_append = & bc - > next ;
}
}
/* This is big enough to avoid excessive chunking, but is
* sufficiently small to keep SSL latency low with a slow uplink . */
# define WRITE_CHUNK_SIZE 1024
int
int
socket_write ( conn_t * conn , conn_iovec_t * iov , int iovcnt )
socket_write ( conn_t * conn , conn_iovec_t * iov , int iovcnt )
{
{
for ( ; iovcnt ; iovcnt - - , iov + + ) {
int i , buf_avail , len , offset = 0 , total = 0 ;
if ( conn - > write_buf ) {
buff_chunk_t * bc , * exwb = conn - > write_buf ;
do_append ( conn , iov - > buf , iov - > len , iov - > takeOwn ) ;
for ( i = 0 ; i < iovcnt ; i + + )
total + = iov [ i ] . len ;
bc = conn - > append_buf ;
if ( bc & & total > = WRITE_CHUNK_SIZE ) {
/* If the new data is too big, queue the pending buffer to avoid latency. */
do_append ( conn , bc ) ;
bc = 0 ;
}
while ( total ) {
if ( ! bc ) {
buf_avail = total > WRITE_CHUNK_SIZE ? total : WRITE_CHUNK_SIZE ;
bc = nfmalloc ( offsetof ( buff_chunk_t , data ) + buf_avail ) ;
bc - > len = 0 ;
} else {
} else {
int n = do_write ( conn , iov - > buf , iov - > len ) ;
/* A pending buffer will always be of standard size - over-sized
if ( n < 0 ) {
* buffers are immediately filled and queued . */
do {
buf_avail = WRITE_CHUNK_SIZE - bc - > len ;
if ( iov - > takeOwn = = GiveOwn )
}
free ( iov - > buf ) ;
while ( total ) {
iovcnt - - , iov + + ;
len = iov - > len - offset ;
} while ( iovcnt ) ;
if ( len > buf_avail )
return - 1 ;
len = buf_avail ;
memcpy ( bc - > data + bc - > len , iov - > buf + offset , len ) ;
bc - > len + = len ;
buf_avail - = len ;
offset + = len ;
total - = len ;
if ( offset = = iov - > len ) {
if ( iov - > takeOwn = = GiveOwn )
free ( iov - > buf ) ;
iov + + ;
offset = 0 ;
}
}
if ( n ! = iov - > len ) {
if ( ! buf_avail ) {
conn - > write_offset = n ;
do_append ( conn , bc ) ;
do_append ( conn , iov - > buf , iov - > len , iov - > takeOwn ) ;
bc = 0 ;
} else if ( iov - > takeOwn = = GiveOwn ) {
break ;
free ( iov - > buf ) ;
}
}
}
}
}
}
conn - > append_buf = bc ;
/* Queue the pending write once the main loop goes idle. */
conf_wakeup ( & conn - > fd_fake , bc ? 0 : - 1 ) ;
/* If no writes were queued before, ensure that flushing commences. */
if ( ! exwb )
return do_queued_write ( conn ) ;
return 0 ;
return 0 ;
}
}
@ -733,6 +757,19 @@ socket_fd_cb( int events, void *aux )
socket_fill ( conn ) ;
socket_fill ( conn ) ;
}
}
static void
socket_fake_cb ( void * aux )
{
conn_t * conn = ( conn_t * ) aux ;
buff_chunk_t * exwb = conn - > write_buf ;
do_append ( conn , conn - > append_buf ) ;
conn - > append_buf = 0 ;
/* If no writes were queued before, ensure that flushing commences. */
if ( ! exwb )
do_queued_write ( conn ) ;
}
# ifdef HAVE_LIBSSL
# ifdef HAVE_LIBSSL
static void
static void
ssl_fake_cb ( void * aux )
ssl_fake_cb ( void * aux )