diff --git a/NEWS b/NEWS index d84eb62..fe8c6f5 100644 --- a/NEWS +++ b/NEWS @@ -10,6 +10,8 @@ Support for SASL (flexible authentication) has been added. Support for Windows file systems has been added. +Support for compressed data transfer has been added. + [1.1.0] Support for hierarchical mailboxes in Patterns. diff --git a/configure.ac b/configure.ac index ac696ee..7b4086d 100644 --- a/configure.ac +++ b/configure.ac @@ -150,6 +150,15 @@ if test "x$ac_cv_berkdb4" = xno; then AC_MSG_ERROR([Berkley DB >= 4.2 not found.]) fi +have_zlib= +AC_CHECK_LIB([z], [deflate], + [AC_CHECK_HEADER(zlib.h, + [have_zlib=1 + AC_SUBST([Z_LIBS], ["-lz"]) + AC_DEFINE([HAVE_LIBZ], 1, [if you have the zlib library])] + )] +) + AC_ARG_ENABLE(compat, AC_HELP_STRING([--disable-compat], [don't include isync compatibility wrapper [no]]), [ob_cv_enable_compat=$enableval]) @@ -172,4 +181,9 @@ if test -n "$have_sasl_paths"; then else AC_MSG_RESULT([Not using SASL]) fi +if test -n "$have_zlib"; then + AC_MSG_RESULT([Using zlib]) +else + AC_MSG_RESULT([Not using zlib]) +fi AC_MSG_RESULT() diff --git a/src/Makefile.am b/src/Makefile.am index 8ae3b53..0a49f21 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -6,7 +6,7 @@ SUBDIRS = $(compat_dir) bin_PROGRAMS = mbsync mdconvert mbsync_SOURCES = main.c sync.c config.c util.c socket.c driver.c drv_imap.c drv_maildir.c -mbsync_LDADD = -ldb $(SSL_LIBS) $(SOCK_LIBS) $(SASL_LIBS) +mbsync_LDADD = -ldb $(SSL_LIBS) $(SOCK_LIBS) $(SASL_LIBS) $(Z_LIBS) noinst_HEADERS = common.h config.h driver.h sync.h socket.h mdconvert_SOURCES = mdconvert.c diff --git a/src/drv_imap.c b/src/drv_imap.c index 141dc40..a931ce9 100644 --- a/src/drv_imap.c +++ b/src/drv_imap.c @@ -196,7 +196,8 @@ enum CAPABILITY { UIDPLUS, LITERALPLUS, MOVE, - NAMESPACE + NAMESPACE, + COMPRESS_DEFLATE }; static const char *cap_list[] = { @@ -210,7 +211,8 @@ static const char *cap_list[] = { "UIDPLUS", "LITERAL+", "MOVE", - "NAMESPACE" + "NAMESPACE", + "COMPRESS=DEFLATE" }; #define RESP_OK 0 @@ -1486,6 +1488,9 @@ static void imap_open_store_authenticate2_p2( imap_store_t *, struct imap_cmd *, static void imap_open_store_namespace( imap_store_t * ); static void imap_open_store_namespace_p2( imap_store_t *, struct imap_cmd *, int ); static void imap_open_store_namespace2( imap_store_t * ); +#ifdef HAVE_LIBZ +static void imap_open_store_compress_p2( imap_store_t *, struct imap_cmd *, int ); +#endif static void imap_open_store_finalize( imap_store_t * ); #ifdef HAVE_LIBSSL static void imap_open_store_ssl_bail( imap_store_t * ); @@ -2041,12 +2046,32 @@ imap_open_store_namespace2( imap_store_t *ctx ) ctx->prefix = nsp_1st_ns->val; if (!ctx->delimiter) ctx->delimiter = nfstrdup( nsp_1st_dl->val ); +#ifdef HAVE_LIBZ + if (CAP(COMPRESS_DEFLATE)) { /* XXX make that configurable */ + imap_exec( ctx, 0, imap_open_store_compress_p2, "COMPRESS DEFLATE" ); + return; + } +#endif imap_open_store_finalize( ctx ); } else { imap_open_store_bail( ctx ); } } +#ifdef HAVE_LIBZ +static void +imap_open_store_compress_p2( imap_store_t *ctx, struct imap_cmd *cmd ATTR_UNUSED, int response ) +{ + if (response == RESP_NO) { + /* We already reported an error, but it's not fatal to us. */ + imap_open_store_finalize( ctx ); + } else if (response == RESP_OK) { + socket_start_deflate( &ctx->conn ); + imap_open_store_finalize( ctx ); + } +} +#endif + static void imap_open_store_finalize( imap_store_t *ctx ) { diff --git a/src/socket.c b/src/socket.c index 586c98c..b300910 100644 --- a/src/socket.c +++ b/src/socket.c @@ -280,6 +280,43 @@ static void start_tls_p3( conn_t *conn, int ok ) #endif /* HAVE_LIBSSL */ +#ifdef HAVE_LIBZ + +static void z_fake_cb( void * ); + +void +socket_start_deflate( conn_t *conn ) +{ + int result; + + conn->in_z = nfcalloc( sizeof(*conn->in_z) ); + result = inflateInit2( + conn->in_z, + -15 /* Use raw deflate */ + ); + if (result != Z_OK) { + error( "Fatal: Cannot initialize decompression: %s\n", conn->in_z->msg ); + abort(); + } + + conn->out_z = nfcalloc( sizeof(*conn->out_z) ); + result = deflateInit2( + conn->out_z, + Z_DEFAULT_COMPRESSION, /* Compression level */ + Z_DEFLATED, /* Only valid value */ + -15, /* Use raw deflate */ + 8, /* Default memory usage */ + Z_DEFAULT_STRATEGY /* Don't try to do anything fancy */ + ); + if (result != Z_OK) { + error( "Fatal: Cannot initialize compression: %s\n", conn->out_z->msg ); + abort(); + } + + init_wakeup( &conn->z_fake, z_fake_cb, conn ); +} +#endif /* HAVE_LIBZ */ + static void socket_fd_cb( int, void * ); static void socket_fake_cb( void * ); @@ -500,6 +537,17 @@ socket_close( conn_t *sock ) sock->ssl = 0; wipe_wakeup( &sock->ssl_fake ); } +#endif +#ifdef HAVE_LIBZ + if (sock->in_z) { + inflateEnd( sock->in_z ); + free( sock->in_z ); + sock->in_z = 0; + deflateEnd( sock->out_z ); + free( sock->out_z ); + sock->out_z = 0; + wipe_wakeup( &sock->z_fake ); + } #endif while (sock->write_buf) dispose_chunk( sock ); @@ -507,23 +555,30 @@ socket_close( conn_t *sock ) sock->append_buf = 0; } -static void -socket_fill( conn_t *sock ) +static int +prepare_read( conn_t *sock, char **buf, int *len ) { - char *buf; int n = sock->offset + sock->bytes; - int len = sizeof(sock->buf) - n; - if (!len) { + if (!(*len = sizeof(sock->buf) - n)) { error( "Socket error: receive buffer full. Probably protocol error.\n" ); socket_fail( sock ); - return; + return -1; } + *buf = sock->buf + n; + return 0; +} + +static int +do_read( conn_t *sock, char *buf, int len ) +{ + int n; + assert( sock->fd >= 0 ); - buf = sock->buf + n; #ifdef HAVE_LIBSSL if (sock->ssl) { if ((n = ssl_return( "read from", sock, SSL_read( sock->ssl, buf, len ) )) <= 0) - return; + return n; + if (n == len && SSL_pending( sock->ssl )) conf_wakeup( &sock->ssl_fake, 0 ); } else @@ -532,15 +587,71 @@ socket_fill( conn_t *sock ) if ((n = read( sock->fd, buf, len )) < 0) { sys_error( "Socket error: read from %s", sock->name ); socket_fail( sock ); - return; } else if (!n) { error( "Socket error: read from %s: unexpected EOF\n", sock->name ); socket_fail( sock ); - return; + return -1; } } - sock->bytes += n; - sock->read_callback( sock->callback_aux ); + + return n; +} + +#ifdef HAVE_LIBZ +static void +socket_fill_z( conn_t *sock ) +{ + char *buf; + int len; + + if (prepare_read( sock, &buf, &len ) < 0) + return; + + sock->in_z->avail_out = len; + sock->in_z->next_out = (unsigned char *)buf; + + if (inflate( sock->in_z, Z_SYNC_FLUSH ) != Z_OK) { + error( "Error decompressing data from %s: %s\n", sock->name, sock->in_z->msg ); + socket_fail( sock ); + return; + } + + if (!sock->in_z->avail_out) + conf_wakeup( &sock->z_fake, 0 ); + + if ((len = (char *)sock->in_z->next_out - buf)) { + sock->bytes += len; + sock->read_callback( sock->callback_aux ); + } +} +#endif + +static void +socket_fill( conn_t *sock ) +{ +#ifdef HAVE_LIBZ + if (sock->in_z) { + /* The timer will preempt reads until the buffer is empty. */ + assert( !sock->in_z->avail_in ); + sock->in_z->next_in = (uchar *)sock->z_buf; + if ((sock->in_z->avail_in = do_read( sock, sock->z_buf, sizeof(sock->z_buf) )) <= 0) + return; + socket_fill_z( sock ); + } else +#endif + { + char *buf; + int len; + + if (prepare_read( sock, &buf, &len ) < 0) + return; + + if ((len = do_read( sock, buf, len )) <= 0) + return; + + sock->bytes += len; + sock->read_callback( sock->callback_aux ); + } } int @@ -655,6 +766,49 @@ do_append( conn_t *conn, buff_chunk_t *bc ) * sufficiently small to keep SSL latency low with a slow uplink. */ #define WRITE_CHUNK_SIZE 1024 +static void +do_flush( conn_t *conn ) +{ + buff_chunk_t *bc = conn->append_buf; +#ifdef HAVE_LIBZ + if (conn->out_z) { + int buf_avail = conn->append_avail; + do { + if (!bc) { + buf_avail = WRITE_CHUNK_SIZE; + bc = nfmalloc( offsetof(buff_chunk_t, data) + buf_avail ); + bc->len = 0; + } + conn->out_z->next_in = Z_NULL; + conn->out_z->avail_in = 0; + conn->out_z->next_out = (uchar *)bc->data + bc->len; + conn->out_z->avail_out = buf_avail; + if (deflate( conn->out_z, Z_PARTIAL_FLUSH ) != Z_OK) { + error( "Fatal: Compression error: %s\n", conn->out_z->msg ); + abort(); + } + bc->len = (char *)conn->out_z->next_out - bc->data; + if (bc->len) { + do_append( conn, bc ); + bc = 0; + buf_avail = 0; + } else { + buf_avail = conn->out_z->avail_out; + } + } while (!conn->out_z->avail_out); + conn->append_buf = bc; + conn->append_avail = buf_avail; + } else +#endif + if (bc) { + do_append( conn, bc ); + conn->append_buf = 0; +#ifdef HAVE_LIBZ + conn->append_avail = 0; +#endif + } +} + int socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt ) { @@ -663,29 +817,54 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt ) for (i = 0; i < iovcnt; i++) total += iov[i].len; - bc = conn->append_buf; - if (bc && total >= WRITE_CHUNK_SIZE) { + if (total >= WRITE_CHUNK_SIZE) { /* If the new data is too big, queue the pending buffer to avoid latency. */ - do_append( conn, bc ); - bc = 0; + do_flush( conn ); } + bc = conn->append_buf; +#ifdef HAVE_LIBZ + buf_avail = conn->append_avail; +#endif while (total) { if (!bc) { + /* We don't do anything special when compressing, as there is no way to + * predict a reasonable output buffer size anyway - deflatePending() does + * not account for consumed but not yet compressed input, and adding up + * the deflateBound()s would be a tad *too* pessimistic. */ buf_avail = total > WRITE_CHUNK_SIZE ? total : WRITE_CHUNK_SIZE; bc = nfmalloc( offsetof(buff_chunk_t, data) + buf_avail ); bc->len = 0; +#ifndef HAVE_LIBZ } else { /* A pending buffer will always be of standard size - over-sized * buffers are immediately filled and queued. */ buf_avail = WRITE_CHUNK_SIZE - bc->len; +#endif } while (total) { len = iov->len - offset; - if (len > buf_avail) - len = buf_avail; - memcpy( bc->data + bc->len, iov->buf + offset, len ); - bc->len += len; - buf_avail -= len; +#ifdef HAVE_LIBZ + if (conn->out_z) { + conn->out_z->next_in = (uchar *)iov->buf + offset; + conn->out_z->avail_in = len; + conn->out_z->next_out = (uchar *)bc->data + bc->len; + conn->out_z->avail_out = buf_avail; + if (deflate( conn->out_z, Z_NO_FLUSH ) != Z_OK) { + error( "Fatal: Compression error: %s\n", conn->out_z->msg ); + abort(); + } + bc->len = (char *)conn->out_z->next_out - bc->data; + buf_avail = conn->out_z->avail_out; + len -= conn->out_z->avail_in; + } else +#endif + { + if (len > buf_avail) + len = buf_avail; + memcpy( bc->data + bc->len, iov->buf + offset, len ); + bc->len += len; + buf_avail -= len; + } offset += len; total -= len; if (offset == iov->len) { @@ -702,8 +881,16 @@ socket_write( conn_t *conn, conn_iovec_t *iov, int iovcnt ) } } conn->append_buf = bc; +#ifdef HAVE_LIBZ + conn->append_avail = buf_avail; +#endif /* Queue the pending write once the main loop goes idle. */ - conf_wakeup( &conn->fd_fake, bc ? 0 : -1 ); + conf_wakeup( &conn->fd_fake, +#ifdef HAVE_LIBZ + /* Always give zlib a chance to flush its internal buffer. */ + conn->out_z || +#endif + bc ? 0 : -1 ); /* If no writes were queued before, ensure that flushing commences. */ if (!exwb) return do_queued_write( conn ); @@ -763,13 +950,22 @@ socket_fake_cb( void *aux ) conn_t *conn = (conn_t *)aux; buff_chunk_t *exwb = conn->write_buf; - do_append( conn, conn->append_buf ); - conn->append_buf = 0; + do_flush( conn ); /* If no writes were queued before, ensure that flushing commences. */ if (!exwb) do_queued_write( conn ); } +#ifdef HAVE_LIBZ +static void +z_fake_cb( void *aux ) +{ + conn_t *conn = (conn_t *)aux; + + socket_fill_z( conn ); +} +#endif + #ifdef HAVE_LIBSSL static void ssl_fake_cb( void *aux ) diff --git a/src/socket.h b/src/socket.h index efecce6..a420e49 100644 --- a/src/socket.h +++ b/src/socket.h @@ -25,6 +25,10 @@ #include "common.h" +#ifdef HAVE_LIBZ +#include +#endif + #ifdef HAVE_LIBSSL typedef struct ssl_st SSL; typedef struct ssl_ctx_st SSL_CTX; @@ -76,6 +80,10 @@ typedef struct { SSL *ssl; wakeup_t ssl_fake; #endif +#ifdef HAVE_LIBZ + z_streamp in_z, out_z; + wakeup_t z_fake; +#endif void (*bad_callback)( void *aux ); /* async fail while sending or listening */ void (*read_callback)( void *aux ); /* data available for reading */ @@ -92,6 +100,9 @@ typedef struct { /* writing */ buff_chunk_t *append_buf; /* accumulating buffer */ buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */ +#ifdef HAVE_LIBZ + int append_avail; /* space left in accumulating buffer */ +#endif int write_offset; /* offset into buffer head */ /* reading */ @@ -99,6 +110,9 @@ typedef struct { int bytes; /* number of filled bytes in buffer */ int scanoff; /* offset to continue scanning for newline at, relative to 'offset' */ char buf[100000]; +#ifdef HAVE_LIBZ + char z_buf[100000]; +#endif } conn_t; /* call this before doing anything with the socket */ @@ -120,6 +134,7 @@ static INLINE void socket_init( conn_t *conn, } void socket_connect( conn_t *conn, void (*cb)( int ok, void *aux ) ); void socket_start_tls(conn_t *conn, void (*cb)( int ok, void *aux ) ); +void socket_start_deflate( conn_t *conn ); void socket_close( conn_t *sock ); int socket_read( conn_t *sock, char *buf, int len ); /* never waits */ char *socket_read_line( conn_t *sock ); /* don't free return value; never waits */