From bd93d689dbd38b6dde76ed8f48e7dc73bf22eef1 Mon Sep 17 00:00:00 2001 From: Oswald Buddenhagen Date: Sat, 25 Aug 2012 18:26:23 +0200 Subject: [PATCH] fully asynchronous IMAP operation - asynchronous sockets using an event loop - connect & starttls have completion callback parameters - callbacks for notification about filled input buffer and emptied output buffer - unsent imap command queue - used when - socket output buffer is non-empty - number of commands in flight exceeds limit - last sent command requires round-trip - command has a dependency on completion of previous command - trashnc is tri-state so only a single "scout" trash APPEND/COPY is sent at first. a possibly resulting CREATE is injected in front of the remaining trash commands, so they can succeed (or be cancel()d if it fails). - queue's presence necessitates imap_cancel implementation --- configure.in | 2 +- src/drv_imap.c | 289 +++++++++++++++++++++--------------- src/isync.h | 34 ++++- src/mbsync.1 | 4 +- src/socket.c | 392 ++++++++++++++++++++++++++++++++++++++----------- 5 files changed, 506 insertions(+), 215 deletions(-) diff --git a/configure.in b/configure.in index f63081a..36b07d0 100644 --- a/configure.in +++ b/configure.in @@ -9,7 +9,7 @@ if test "$GCC" = yes; then CFLAGS="$CFLAGS -pipe -W -Wall -Wshadow -Wstrict-prototypes" fi -AC_CHECK_HEADERS(sys/filio.h sys/poll.h sys/select.h) +AC_CHECK_HEADERS(sys/poll.h sys/select.h) AC_CHECK_FUNCS(vasprintf) AC_CHECK_LIB(socket, socket, [SOCK_LIBS="-lsocket"]) diff --git a/src/drv_imap.c b/src/drv_imap.c index 64e0e44..2f9347f 100644 --- a/src/drv_imap.c +++ b/src/drv_imap.c @@ -81,7 +81,8 @@ typedef struct imap_store { const char *prefix; int ref_count; int uidnext; /* from SELECT responses */ - unsigned trashnc:1; /* trash folder's existence is not confirmed yet */ + /* trash folder's existence is not confirmed yet */ + enum { TrashUnknown, TrashChecking, TrashKnown } trashnc; unsigned got_namespace:1; list_t *ns_personal, *ns_other, *ns_shared; /* NAMESPACE info */ message_t **msgapp; /* FETCH results */ @@ -89,12 +90,15 @@ typedef struct imap_store { parse_list_state_t parse_list_sts; /* command queue */ int nexttag, num_in_progress, literal_pending; + struct imap_cmd *pending, **pending_append; struct imap_cmd *in_progress, **in_progress_append; /* Used during sequential operations like connect */ enum { GreetingPending = 0, GreetingBad, GreetingOk, GreetingPreauth } greeting; + int canceling; /* imap_cancel() is in progress */ union { void (*imap_open)( store_t *srv, void *aux ); + void (*imap_cancel)( void *aux ); } callbacks; void *callback_aux; @@ -115,6 +119,7 @@ struct imap_cmd { int data_len; int uid; /* to identify fetch responses */ unsigned + high_prio:1, /* if command is queued, put it at the front of the queue. */ to_trash:1, /* we are storing to trash, not current. */ create:1, /* create the mailbox if we get an error ... */ trycreate:1; /* ... but only if this is true or the server says so. */ @@ -179,8 +184,6 @@ static const char *cap_list[] = { #define RESP_NO 1 #define RESP_CANCEL 2 -static int get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd ); - static INLINE void imap_ref( imap_store_t *ctx ) { ++ctx->ref_count; } static int imap_deref( imap_store_t *ctx ); @@ -221,30 +224,18 @@ done_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, int response ) free( cmd ); } -static struct imap_cmd * -v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, - const char *fmt, va_list ap ) +static int +send_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd ) { int bufl, litplus; const char *buffmt; char buf[1024]; - assert( ctx ); - assert( ctx->gen.bad_callback ); - assert( cmd ); - assert( cmd->param.done ); - - while (ctx->literal_pending) - if (get_cmd_result( ctx, 0 ) == RESP_CANCEL) - goto bail; - cmd->tag = ++ctx->nexttag; - if (fmt) - nfvasprintf( &cmd->cmd, fmt, ap ); if (!cmd->param.data) { buffmt = "%d %s\r\n"; litplus = 0; - } else if ((cmd->param.to_trash && ctx->trashnc) || !CAP(LITERALPLUS)) { + } else if ((cmd->param.to_trash && ctx->trashnc == TrashUnknown) || !CAP(LITERALPLUS)) { buffmt = "%d %s{%d}\r\n"; litplus = 0; } else { @@ -272,27 +263,52 @@ v_submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, } else if (cmd->param.cont || cmd->param.data) { ctx->literal_pending = 1; } + if (cmd->param.to_trash && ctx->trashnc == TrashUnknown) + ctx->trashnc = TrashChecking; cmd->next = 0; *ctx->in_progress_append = cmd; ctx->in_progress_append = &cmd->next; ctx->num_in_progress++; - return cmd; + return 0; bail: done_imap_cmd( ctx, cmd, RESP_CANCEL ); - return NULL; + return -1; } -static struct imap_cmd * -submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd, const char *fmt, ... ) +static int +cmd_submittable( imap_store_t *ctx, struct imap_cmd *cmd ) { - struct imap_cmd *ret; - va_list ap; + return !ctx->conn.write_buf && + !ctx->literal_pending && + !(cmd->param.to_trash && ctx->trashnc == TrashChecking) && + ctx->num_in_progress < ((imap_store_conf_t *)ctx->gen.conf)->server->max_in_progress; +} - va_start( ap, fmt ); - ret = v_submit_imap_cmd( ctx, cmd, fmt, ap ); - va_end( ap ); - return ret; +static int +flush_imap_cmds( imap_store_t *ctx ) +{ + struct imap_cmd *cmd; + + while ((cmd = ctx->pending) && cmd_submittable( ctx, cmd )) { + if (!(ctx->pending = cmd->next)) + ctx->pending_append = &ctx->pending; + if (send_imap_cmd( ctx, cmd ) < 0) + return -1; + } + return 0; +} + +static void +cancel_pending_imap_cmds( imap_store_t *ctx ) +{ + struct imap_cmd *cmd; + + while ((cmd = ctx->pending)) { + if (!(ctx->pending = cmd->next)) + ctx->pending_append = &ctx->pending; + done_imap_cmd( ctx, cmd, RESP_CANCEL ); + } } static void @@ -307,6 +323,29 @@ cancel_submitted_imap_cmds( imap_store_t *ctx ) } } +static int +submit_imap_cmd( imap_store_t *ctx, struct imap_cmd *cmd ) +{ + assert( ctx ); + assert( ctx->gen.bad_callback ); + assert( cmd ); + assert( cmd->param.done ); + + if ((ctx->pending && !cmd->param.high_prio) || !cmd_submittable( ctx, cmd )) { + if (ctx->pending && cmd->param.high_prio) { + cmd->next = ctx->pending; + ctx->pending = cmd; + } else { + cmd->next = 0; + *ctx->pending_append = cmd; + ctx->pending_append = &cmd->next; + } + return 0; + } + + return send_imap_cmd( ctx, cmd ); +} + static int imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp, void (*done)( imap_store_t *ctx, struct imap_cmd *cmd, int response ), @@ -318,12 +357,9 @@ imap_exec( imap_store_t *ctx, struct imap_cmd *cmdp, cmdp = new_imap_cmd( sizeof(*cmdp) ); cmdp->param.done = done; va_start( ap, fmt ); - cmdp = v_submit_imap_cmd( ctx, cmdp, fmt, ap ); + nfvasprintf( &cmdp->cmd, fmt, ap ); va_end( ap ); - if (!cmdp) - return RESP_CANCEL; - - return get_cmd_result( ctx, cmdp ); + return submit_imap_cmd( ctx, cmdp ); } static void @@ -393,25 +429,6 @@ imap_refcounted_done( struct imap_cmd_refcounted_state *sts ) free( sts ); } -/* -static void -drain_imap_replies( imap_store_t *ctx ) -{ - while (ctx->num_in_progress) - get_cmd_result( ctx, 0 ); -} -*/ - -static int -process_imap_replies( imap_store_t *ctx ) -{ - while (ctx->num_in_progress > ((imap_store_conf_t *)ctx->gen.conf)->server->max_in_progress || - socket_pending( &ctx->conn )) - if (get_cmd_result( ctx, 0 ) == RESP_CANCEL) - return RESP_CANCEL; - return RESP_OK; -} - static int is_atom( list_t *list ) { @@ -798,20 +815,22 @@ struct imap_cmd_trycreate { static void imap_open_store_greeted( imap_store_t * ); static void get_cmd_result_p2( imap_store_t *, struct imap_cmd *, int ); -static int -get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd ) +static void +imap_socket_read( void *aux ) { + imap_store_t *ctx = (imap_store_t *)aux; struct imap_cmd *cmdp, **pcmdp; char *cmd, *arg, *arg1, *p; int resp, resp2, tag, greeted; greeted = ctx->greeting; + if (ctx->parse_list_sts.level) { + cmd = 0; + goto do_fetch; + } for (;;) { - if (!(cmd = socket_read_line( &ctx->conn ))) { - if (socket_fill( &ctx->conn ) < 0) - return RESP_CANCEL; - continue; - } + if (!(cmd = socket_read_line( &ctx->conn ))) + return; arg = next_arg( &cmd ); if (*arg == '*') { @@ -850,11 +869,8 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd ) do_fetch: if ((resp = parse_imap_list( ctx, &cmd, &ctx->parse_list_sts )) == LIST_BAD) break; /* stream is likely to be useless now */ - if (resp == LIST_PARTIAL) { - if (socket_fill( &ctx->conn ) < 0) - return RESP_CANCEL; - goto do_fetch; - } + if (resp == LIST_PARTIAL) + return; if (parse_fetch( ctx, ctx->parse_list_sts.head ) < 0) break; /* this may mean anything, so prefer not to spam the log */ } @@ -865,8 +881,10 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd ) if (greeted == GreetingPending) { imap_ref( ctx ); imap_open_store_greeted( ctx ); - return imap_deref( ctx ) ? RESP_CANCEL : RESP_OK; + if (imap_deref( ctx )) + return; } + continue; } else if (!ctx->in_progress) { error( "IMAP error: unexpected reply: %s %s\n", arg, cmd ? cmd : "" ); break; /* this may mean anything, so prefer not to spam the log */ @@ -876,24 +894,22 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd ) cmdp = ctx->in_progress; if (cmdp->param.data) { if (cmdp->param.to_trash) - ctx->trashnc = 0; /* Can't get NO [TRYCREATE] any more. */ + ctx->trashnc = TrashKnown; /* Can't get NO [TRYCREATE] any more. */ p = cmdp->param.data; cmdp->param.data = 0; if (socket_write( &ctx->conn, p, cmdp->param.data_len, GiveOwn ) < 0) - return RESP_CANCEL; + return; } else if (cmdp->param.cont) { if (cmdp->param.cont( ctx, cmdp, cmd )) - return RESP_CANCEL; + return; } else { error( "IMAP error: unexpected command continuation request\n" ); break; } if (socket_write( &ctx->conn, "\r\n", 2, KeepOwn ) < 0) - return RESP_CANCEL; + return; if (!cmdp->param.cont) ctx->literal_pending = 0; - if (!tcmd) - return RESP_OK; } else { tag = atoi( arg ); for (pcmdp = &ctx->in_progress; (cmdp = *pcmdp); pcmdp = &cmdp->next) @@ -910,7 +926,7 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd ) arg = next_arg( &cmd ); if (!strcmp( "OK", arg )) { if (cmdp->param.to_trash) - ctx->trashnc = 0; /* Can't get NO [TRYCREATE] any more. */ + ctx->trashnc = TrashKnown; /* Can't get NO [TRYCREATE] any more. */ resp = RESP_OK; } else { if (!strcmp( "NO", arg )) { @@ -921,10 +937,11 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd ) struct imap_cmd_trycreate *cmd2 = (struct imap_cmd_trycreate *)new_imap_cmd( sizeof(*cmd2) ); cmd2->orig_cmd = cmdp; - cmd2->gen.param.done = get_cmd_result_p2; + cmd2->gen.param.high_prio = 1; p = strchr( cmdp->cmd, '"' ); - if (!submit_imap_cmd( ctx, &cmd2->gen, "CREATE %.*s", strchr( p + 1, '"' ) - p + 1, p )) - return RESP_CANCEL; + if (imap_exec( ctx, &cmd2->gen, get_cmd_result_p2, + "CREATE %.*s", strchr( p + 1, '"' ) - p + 1, p ) < 0) + return; continue; } resp = RESP_NO; @@ -941,13 +958,17 @@ get_cmd_result( imap_store_t *ctx, struct imap_cmd *tcmd ) imap_invoke_bad_callback( ctx ); done_imap_cmd( ctx, cmdp, resp ); if (imap_deref( ctx )) - resp = RESP_CANCEL; - if (resp == RESP_CANCEL || !tcmd || tcmd == cmdp) - return resp; + return; + if (ctx->canceling && !ctx->in_progress) { + ctx->canceling = 0; + ctx->callbacks.imap_cancel( ctx->callback_aux ); + return; + } } + if (flush_imap_cmds( ctx ) < 0) + return; } imap_invoke_bad_callback( ctx ); - return RESP_CANCEL; } static void @@ -960,8 +981,11 @@ get_cmd_result_p2( imap_store_t *ctx, struct imap_cmd *cmd, int response ) done_imap_cmd( ctx, ocmd, response ); } else { ctx->uidnext = 0; + if (ocmd->param.to_trash) + ctx->trashnc = TrashKnown; ocmd->param.create = 0; - submit_imap_cmd( ctx, ocmd, 0 ); + ocmd->param.high_prio = 1; + submit_imap_cmd( ctx, ocmd ); } } @@ -974,6 +998,7 @@ imap_cancel_store( store_t *gctx ) socket_close( &ctx->conn ); cancel_submitted_imap_cmds( ctx ); + cancel_pending_imap_cmds( ctx ); free_generic_messages( ctx->gen.msgs ); free_string_list( ctx->gen.boxes ); free_list( ctx->ns_personal ); @@ -1082,10 +1107,15 @@ do_cram_auth( imap_store_t *ctx, struct imap_cmd *cmdp, const char *prompt ) } #endif +static void imap_open_store_connected( int, void * ); +#ifdef HAVE_LIBSSL +static void imap_open_store_tlsstarted1( int, void * ); +#endif static void imap_open_store_p2( imap_store_t *, struct imap_cmd *, int ); static void imap_open_store_authenticate( imap_store_t * ); #ifdef HAVE_LIBSSL static void imap_open_store_authenticate_p2( imap_store_t *, struct imap_cmd *, int ); +static void imap_open_store_tlsstarted2( int, void * ); static void imap_open_store_authenticate_p3( imap_store_t *, struct imap_cmd *, int ); #endif static void imap_open_store_authenticate2( imap_store_t * ); @@ -1131,26 +1161,41 @@ imap_open_store( store_conf_t *conf, ctx->callback_aux = aux; set_bad_callback( &ctx->gen, (void (*)(void *))imap_open_store_bail, ctx ); ctx->in_progress_append = &ctx->in_progress; + ctx->pending_append = &ctx->pending; - socket_init( &ctx->conn, (void (*)( void * ))imap_invoke_bad_callback, ctx ); + socket_init( &ctx->conn, &srvc->sconf, + (void (*)( void * ))imap_invoke_bad_callback, + imap_socket_read, (int (*)(void *))flush_imap_cmds, ctx ); + socket_connect( &ctx->conn, imap_open_store_connected ); +} - if (!socket_connect( &srvc->sconf, &ctx->conn )) - goto bail; +static void +imap_open_store_connected( int ok, void *aux ) +{ + imap_store_t *ctx = (imap_store_t *)aux; +#ifdef HAVE_LIBSSL + imap_store_conf_t *cfg = (imap_store_conf_t *)ctx->gen.conf; + imap_server_conf_t *srvc = cfg->server; +#endif + if (!ok) + imap_open_store_bail( ctx ); #ifdef HAVE_LIBSSL - if (srvc->sconf.use_imaps) { - if (socket_start_tls( &srvc->sconf, &ctx->conn )) { - imap_open_store_ssl_bail( ctx ); - return; - } - } + else if (srvc->sconf.use_imaps) + socket_start_tls( &ctx->conn, imap_open_store_tlsstarted1 ); #endif - get_cmd_result( ctx, 0 ); - return; +} - bail: - imap_open_store_bail( ctx ); +#ifdef HAVE_LIBSSL +static void +imap_open_store_tlsstarted1( int ok, void *aux ) +{ + imap_store_t *ctx = (imap_store_t *)aux; + + if (!ok) + imap_open_store_ssl_bail( ctx ); } +#endif static void imap_open_store_greeted( imap_store_t *ctx ) @@ -1213,7 +1258,16 @@ imap_open_store_authenticate_p2( imap_store_t *ctx, struct imap_cmd *cmd ATTR_UN { if (response != RESP_OK) imap_open_store_bail( ctx ); - else if (socket_start_tls( &((imap_server_conf_t *)ctx->gen.conf)->sconf, &ctx->conn )) + else + socket_start_tls( &ctx->conn, imap_open_store_tlsstarted2 ); +} + +static void +imap_open_store_tlsstarted2( int ok, void *aux ) +{ + imap_store_t *ctx = (imap_store_t *)aux; + + if (!ok) imap_open_store_ssl_bail( ctx ); else imap_exec( ctx, 0, imap_open_store_authenticate_p3, "CAPABILITY" ); @@ -1343,7 +1397,7 @@ static void imap_open_store_finalize( imap_store_t *ctx ) { set_bad_callback( &ctx->gen, 0, 0 ); - ctx->trashnc = 1; + ctx->trashnc = TrashUnknown; ctx->callbacks.imap_open( &ctx->gen, ctx->callback_aux ); } @@ -1404,8 +1458,7 @@ imap_select( store_t *gctx, int create, /******************* imap_load *******************/ -static int imap_submit_load( imap_store_t *, const char *, struct imap_cmd_refcounted_state *, - struct imap_cmd ** ); +static int imap_submit_load( imap_store_t *, const char *, struct imap_cmd_refcounted_state * ); static void imap_load_p2( imap_store_t *, struct imap_cmd *, int ); static void @@ -1420,7 +1473,6 @@ imap_load( store_t *gctx, int minuid, int maxuid, int *excs, int nexcs, free( excs ); cb( DRV_OK, aux ); } else { - struct imap_cmd *cmd2 = 0; struct imap_cmd_refcounted_state *sts = imap_refcounted_new_state( cb, aux ); ctx->msgapp = &ctx->gen.msgs; @@ -1435,35 +1487,29 @@ imap_load( store_t *gctx, int minuid, int maxuid, int *excs, int nexcs, if (i != j) bl += sprintf( buf + bl, ":%d", excs[i] ); } - if (imap_submit_load( ctx, buf, sts, &cmd2 ) < 0) + if (imap_submit_load( ctx, buf, sts ) < 0) goto done; } if (maxuid == INT_MAX) maxuid = ctx->uidnext >= 0 ? ctx->uidnext - 1 : 1000000000; if (maxuid >= minuid) { sprintf( buf, "%d:%d", minuid, maxuid ); - imap_submit_load( ctx, buf, sts, &cmd2 ); + imap_submit_load( ctx, buf, sts ); } done: free( excs ); if (!--sts->ref_count) imap_refcounted_done( sts ); - else - get_cmd_result( ctx, cmd2 ); } } static int -imap_submit_load( imap_store_t *ctx, const char *buf, struct imap_cmd_refcounted_state *sts, - struct imap_cmd **cmdp ) +imap_submit_load( imap_store_t *ctx, const char *buf, struct imap_cmd_refcounted_state *sts ) { - struct imap_cmd *cmd = imap_refcounted_new_cmd( sts ); - cmd->param.done = imap_load_p2; - *cmdp = cmd; - return submit_imap_cmd( ctx, cmd, - "UID FETCH %s (UID%s%s)", buf, - (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "", - (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "" ) ? 0 : -1; + return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_load_p2, + "UID FETCH %s (UID%s%s)", buf, + (ctx->gen.opts & OPEN_FLAGS) ? " FLAGS" : "", + (ctx->gen.opts & OPEN_SIZE) ? " RFC822.SIZE" : "" ); } static void @@ -1528,12 +1574,9 @@ imap_flags_helper( imap_store_t *ctx, int uid, char what, int flags, { char buf[256]; - struct imap_cmd *cmd = imap_refcounted_new_cmd( sts ); - cmd->param.done = imap_set_flags_p2; buf[imap_make_flags( flags, buf )] = 0; - if (!submit_imap_cmd( ctx, cmd, "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf )) - return -1; - return process_imap_replies( ctx ) == RESP_CANCEL ? -1 : 0; + return imap_exec( ctx, imap_refcounted_new_cmd( sts ), imap_set_flags_p2, + "UID STORE %d %cFLAGS.SILENT %s", uid, what, buf ); } static void @@ -1705,8 +1748,16 @@ static void imap_cancel( store_t *gctx, void (*cb)( void *aux ), void *aux ) { - (void)gctx; - cb( aux ); + imap_store_t *ctx = (imap_store_t *)gctx; + + cancel_pending_imap_cmds( ctx ); + if (ctx->in_progress) { + ctx->canceling = 1; + ctx->callbacks.imap_cancel = cb; + ctx->callback_aux = aux; + } else { + cb( aux ); + } } /******************* imap_commit *******************/ @@ -1753,7 +1804,7 @@ imap_parse_store( conffile_t *cfg, store_conf_t **storep, int *err ) server->require_ssl = 1; server->sconf.use_tlsv1 = 1; #endif - server->max_in_progress = 50; + server->max_in_progress = INT_MAX; while (getcline( cfg ) && cfg->cmd) { if (!strcasecmp( "Host", cfg->cmd )) { diff --git a/src/isync.h b/src/isync.h index 1c8c038..0e1e9c3 100644 --- a/src/isync.h +++ b/src/isync.h @@ -73,15 +73,36 @@ typedef struct server_conf { #endif } server_conf_t; +typedef struct buff_chunk { + struct buff_chunk *next; + char *data; + int len; + char buf[1]; +} buff_chunk_t; + typedef struct { + /* connection */ int fd; + int state; + const server_conf_t *conf; /* needed during connect */ #ifdef HAVE_LIBSSL SSL *ssl; #endif void (*bad_callback)( void *aux ); /* async fail while sending or listening */ + void (*read_callback)( void *aux ); /* data available for reading */ + int (*write_callback)( void *aux ); /* all *queued* data was sent */ + union { + void (*connect)( int ok, void *aux ); + void (*starttls)( int ok, void *aux ); + } callbacks; void *callback_aux; + /* writing */ + buff_chunk_t *write_buf, **write_buf_append; /* buffer head & tail */ + int write_offset; /* offset into buffer head */ + + /* reading */ int offset; /* start of filled bytes in buffer */ int bytes; /* number of filled bytes in buffer */ int scanoff; /* offset to continue scanning for newline at, relative to 'offset' */ @@ -335,22 +356,27 @@ extern const char *Home; /* call this before doing anything with the socket */ static INLINE void socket_init( conn_t *conn, + const server_conf_t *conf, void (*bad_callback)( void *aux ), + void (*read_callback)( void *aux ), + int (*write_callback)( void *aux ), void *aux ) { + conn->conf = conf; conn->bad_callback = bad_callback; + conn->read_callback = read_callback; + conn->write_callback = write_callback; conn->callback_aux = aux; conn->fd = -1; + conn->write_buf_append = &conn->write_buf; } -int socket_connect( const server_conf_t *conf, conn_t *sock ); -int socket_start_tls( const server_conf_t *conf, conn_t *sock ); +void socket_connect( conn_t *conn, void (*cb)( int ok, void *aux ) ); +void socket_start_tls(conn_t *conn, void (*cb)( int ok, void *aux ) ); void socket_close( conn_t *sock ); -int socket_fill( conn_t *sock ); int socket_read( conn_t *sock, char *buf, int len ); /* never waits */ char *socket_read_line( conn_t *sock ); /* don't free return value; never waits */ typedef enum { KeepOwn = 0, GiveOwn } ownership_t; int socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn ); -int socket_pending( conn_t *sock ); void cram( const char *challenge, const char *user, const char *pass, char **_final, int *_finallen ); diff --git a/src/mbsync.1 b/src/mbsync.1 index 4f47668..e8cbabb 100644 --- a/src/mbsync.1 +++ b/src/mbsync.1 @@ -281,10 +281,8 @@ Use TLSv1 for communication with the IMAP server over SSL? \fBPipelineDepth\fR \fIdepth\fR Maximum number of IMAP commands which can be simultaneously in flight. Setting this to \fI1\fR disables pipelining. -Setting it to a too big value may deadlock isync. -Currently, this affects only a few commands. This is mostly a debugging only option. -(Default: \fI50\fR) +(Default: \fIunlimited\fR) .. .SS IMAP Stores The reference point for relative \fBPath\fRs is whatever the server likes it diff --git a/src/socket.c b/src/socket.c index c8d1a57..bf8a837 100644 --- a/src/socket.c +++ b/src/socket.c @@ -36,56 +36,67 @@ #include #include #include +#include #include #include +#include #include #include -#ifdef HAVE_SYS_FILIO_H -# include -#endif #include #include #include #include +enum { + SCK_CONNECTING, +#ifdef HAVE_LIBSSL + SCK_STARTTLS, +#endif + SCK_READY +}; + static void socket_fail( conn_t *conn ) { conn->bad_callback( conn->callback_aux ); } -static void -socket_perror( const char *func, conn_t *sock, int ret ) -{ #ifdef HAVE_LIBSSL +static int +ssl_return( const char *func, conn_t *conn, int ret ) +{ int err; - if (sock->ssl) { - switch ((err = SSL_get_error( sock->ssl, ret ))) { - case SSL_ERROR_SYSCALL: - case SSL_ERROR_SSL: - if ((err = ERR_get_error()) == 0) { - if (ret == 0) - error( "SSL_%s: got EOF\n", func ); - else - error( "SSL_%s: %s\n", func, strerror(errno) ); - } else - error( "SSL_%s: %s\n", func, ERR_error_string( err, 0 ) ); - break; - default: - error( "SSL_%s: unhandled SSL error %d\n", func, err ); - break; + switch ((err = SSL_get_error( conn->ssl, ret ))) { + case SSL_ERROR_NONE: + return ret; + case SSL_ERROR_WANT_WRITE: + conf_fd( conn->fd, POLLIN, POLLOUT ); + /* fallthrough */ + case SSL_ERROR_WANT_READ: + return 0; + case SSL_ERROR_SYSCALL: + case SSL_ERROR_SSL: + if (!(err = ERR_get_error())) { + if (ret == 0) + error( "SSL_%s: unexpected EOF\n", func ); + else + error( "SSL_%s: %s\n", func, strerror( errno ) ); + } else { + error( "SSL_%s: %s\n", func, ERR_error_string( err, 0 ) ); } - } else -#endif - if (ret < 0) - perror( func ); + break; + default: + error( "SSL_%s: unhandled SSL error %d\n", func, err ); + break; + } + if (conn->state == SCK_STARTTLS) + conn->callbacks.starttls( 0, conn->callback_aux ); else - error( "%s: unexpected EOF\n", func ); - socket_fail( sock ); + socket_fail( conn ); + return -1; } -#ifdef HAVE_LIBSSL /* Some of this code is inspired by / lifted from mutt. */ static int @@ -245,45 +256,85 @@ init_ssl_ctx( const server_conf_t *conf ) return 0; } -int -socket_start_tls( const server_conf_t *conf, conn_t *sock ) +static void start_tls_p2( conn_t * ); +static void start_tls_p3( conn_t *, int ); + +void +socket_start_tls( conn_t *conn, void (*cb)( int ok, void *aux ) ) { - int ret; static int ssl_inited; + conn->callbacks.starttls = cb; + if (!ssl_inited) { SSL_library_init(); SSL_load_error_strings(); ssl_inited = 1; } - if (!conf->SSLContext && init_ssl_ctx( conf )) - return 1; - - sock->ssl = SSL_new( ((server_conf_t *)conf)->SSLContext ); - SSL_set_fd( sock->ssl, sock->fd ); - if ((ret = SSL_connect( sock->ssl )) <= 0) { - socket_perror( "connect", sock, ret ); - return 1; + if (!conn->conf->SSLContext && init_ssl_ctx( conn->conf )) { + start_tls_p3( conn, 0 ); + return; } - /* verify the server certificate */ - if (verify_cert( conf, sock )) - return 1; + conn->ssl = SSL_new( ((server_conf_t *)conn->conf)->SSLContext ); + SSL_set_fd( conn->ssl, conn->fd ); + SSL_set_mode( conn->ssl, SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER ); + start_tls_p2( conn ); +} - info( "Connection is now encrypted\n" ); - return 0; +static void +start_tls_p2( conn_t *conn ) +{ + switch (ssl_return( "connect", conn, SSL_connect( conn->ssl ) )) { + case -1: + start_tls_p3( conn, 0 ); + break; + case 0: + break; + default: + /* verify the server certificate */ + if (verify_cert( conn->conf, conn )) { + start_tls_p3( conn, 0 ); + } else { + info( "Connection is now encrypted\n" ); + start_tls_p3( conn, 1 ); + } + break; + } +} + +static void start_tls_p3( conn_t *conn, int ok ) +{ + conn->state = SCK_READY; + conn->callbacks.starttls( ok, conn->callback_aux ); } #endif /* HAVE_LIBSSL */ -int -socket_connect( const server_conf_t *conf, conn_t *sock ) +static void socket_fd_cb( int, void * ); + +static void socket_connected2( conn_t * ); +static void socket_connect_bail( conn_t * ); + +static void +socket_close_internal( conn_t *sock ) +{ + del_fd( sock->fd ); + close( sock->fd ); + sock->fd = -1; +} + +void +socket_connect( conn_t *sock, void (*cb)( int ok, void *aux ) ) { + const server_conf_t *conf = sock->conf; struct hostent *he; struct sockaddr_in addr; int s, a[2]; + sock->callbacks.connect = cb; + /* open connection to IMAP server */ if (conf->tunnel) { infon( "Starting tunnel '%s'... ", conf->tunnel ); @@ -304,6 +355,10 @@ socket_connect( const server_conf_t *conf, conn_t *sock ) close( a[0] ); sock->fd = a[1]; + + fcntl( a[1], F_SETFL, O_NONBLOCK ); + add_fd( a[1], socket_fd_cb, sock ); + } else { memset( &addr, 0, sizeof(addr) ); addr.sin_port = conf->port ? htons( conf->port ) : @@ -317,7 +372,7 @@ socket_connect( const server_conf_t *conf, conn_t *sock ) he = gethostbyname( conf->host ); if (!he) { error( "IMAP error: Cannot resolve server '%s'\n", conf->host ); - return -1; + goto bail; } info( "ok\n" ); @@ -328,36 +383,87 @@ socket_connect( const server_conf_t *conf, conn_t *sock ) perror( "socket" ); exit( 1 ); } + sock->fd = s; + fcntl( s, F_SETFL, O_NONBLOCK ); + add_fd( s, socket_fd_cb, sock ); - infon( "Connecting to %s:%hu... ", inet_ntoa( addr.sin_addr ), ntohs( addr.sin_port ) ); + infon( "Connecting to %s (%s:%hu) ... ", + conf->host, inet_ntoa( addr.sin_addr ), ntohs( addr.sin_port ) ); if (connect( s, (struct sockaddr *)&addr, sizeof(addr) )) { - close( s ); - perror( "connect" ); - return -1; + if (errno != EINPROGRESS) { + perror( "connect" ); + socket_close_internal( sock ); + goto bail; + } + conf_fd( s, 0, POLLOUT ); + sock->state = SCK_CONNECTING; + info( "\n" ); + return; } - sock->fd = s; } info( "ok\n" ); - return 0; + socket_connected2( sock ); + return; + + bail: + socket_connect_bail( sock ); +} + +static void +socket_connected( conn_t *conn ) +{ + int soerr; + socklen_t selen = sizeof(soerr); + + infon( "Connecting to %s: ", conn->conf->host ); + if (getsockopt( conn->fd, SOL_SOCKET, SO_ERROR, &soerr, &selen )) { + perror( "getsockopt" ); + exit( 1 ); + } + if (soerr) { + errno = soerr; + perror( "connect" ); + socket_close_internal( conn ); + socket_connect_bail( conn ); + return; + } + info( "ok\n" ); + socket_connected2( conn ); +} + +static void +socket_connected2( conn_t *conn ) +{ + conf_fd( conn->fd, 0, POLLIN ); + conn->state = SCK_READY; + conn->callbacks.connect( 1, conn->callback_aux ); } +static void +socket_connect_bail( conn_t *conn ) +{ + conn->callbacks.connect( 0, conn->callback_aux ); +} + +static void dispose_chunk( conn_t *conn ); + void socket_close( conn_t *sock ) { - if (sock->fd >= 0) { - close( sock->fd ); - sock->fd = -1; - } + if (sock->fd >= 0) + socket_close_internal( sock ); #ifdef HAVE_LIBSSL if (sock->ssl) { SSL_free( sock->ssl ); sock->ssl = 0; } #endif + while (sock->write_buf) + dispose_chunk( sock ); } -int +static void socket_fill( conn_t *sock ) { char *buf; @@ -366,22 +472,31 @@ socket_fill( conn_t *sock ) if (!len) { error( "Socket error: receive buffer full. Probably protocol error.\n" ); socket_fail( sock ); - return -1; + return; } assert( sock->fd >= 0 ); buf = sock->buf + n; - n = #ifdef HAVE_LIBSSL - sock->ssl ? SSL_read( sock->ssl, buf, len ) : + if (sock->ssl) { + if ((n = ssl_return( "read", sock, SSL_read( sock->ssl, buf, len ) )) <= 0) + return; + if (n == len && SSL_pending( sock->ssl )) + fake_fd( sock->fd, POLLIN ); + } else #endif - read( sock->fd, buf, len ); - if (n <= 0) { - socket_perror( "read", sock, n ); - return -1; - } else { - sock->bytes += n; - return 0; + { + if ((n = read( sock->fd, buf, len )) < 0) { + perror( "read" ); + socket_fail( sock ); + return; + } else if (!n) { + error( "read: unexpected EOF\n" ); + socket_fail( sock ); + return; + } } + sock->bytes += n; + sock->read_callback( sock->callback_aux ); } int @@ -426,40 +541,141 @@ socket_read_line( conn_t *b ) return s; } -int -socket_write( conn_t *sock, char *buf, int len, ownership_t takeOwn ) +static int +do_write( conn_t *sock, char *buf, int len ) { int n; assert( sock->fd >= 0 ); - n = #ifdef HAVE_LIBSSL - sock->ssl ? SSL_write( sock->ssl, buf, len ) : + if (sock->ssl) + return ssl_return( "write", sock, SSL_write( sock->ssl, buf, len ) ); #endif - write( sock->fd, buf, len ); - if (takeOwn == GiveOwn) - free( buf ); - if (n != len) { - socket_perror( "write", sock, n ); - return -1; + n = write( sock->fd, buf, len ); + if (n < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + perror( "write" ); + socket_fail( sock ); + } else { + n = 0; + conf_fd( sock->fd, POLLIN, POLLOUT ); + } + } else if (n != len) { + conf_fd( sock->fd, POLLIN, POLLOUT ); } - return 0; + return n; +} + +static void +dispose_chunk( conn_t *conn ) +{ + buff_chunk_t *bc = conn->write_buf; + if (!(conn->write_buf = bc->next)) + conn->write_buf_append = &conn->write_buf; + if (bc->data != bc->buf) + free( bc->data ); + free( bc ); +} + +static int +do_queued_write( conn_t *conn ) +{ + buff_chunk_t *bc; + + if (!conn->write_buf) + return 0; + + while ((bc = conn->write_buf)) { + int n, len = bc->len - conn->write_offset; + if ((n = do_write( conn, bc->data + conn->write_offset, len )) < 0) + return -1; + if (n != len) { + conn->write_offset += n; + return 0; + } + conn->write_offset = 0; + dispose_chunk( conn ); + } +#ifdef HAVE_LIBSSL + if (conn->ssl && SSL_pending( conn->ssl )) + fake_fd( conn->fd, POLLIN ); +#endif + return conn->write_callback( conn->callback_aux ); +} + +static void +do_append( conn_t *conn, char *buf, int len, ownership_t takeOwn ) +{ + buff_chunk_t *bc; + + if (takeOwn == GiveOwn) { + bc = nfmalloc( offsetof(buff_chunk_t, buf) ); + bc->data = buf; + } else { + bc = nfmalloc( offsetof(buff_chunk_t, buf) + len ); + bc->data = bc->buf; + memcpy( bc->data, buf, len ); + } + bc->len = len; + bc->next = 0; + *conn->write_buf_append = bc; + conn->write_buf_append = &bc->next; } int -socket_pending( conn_t *sock ) +socket_write( conn_t *conn, char *buf, int len, ownership_t takeOwn ) +{ + if (conn->write_buf) { + do_append( conn, buf, len, takeOwn ); + return len; + } else { + int n = do_write( conn, buf, len ); + if (n != len && n >= 0) { + conn->write_offset = n; + do_append( conn, buf, len, takeOwn ); + } else if (takeOwn) { + free( buf ); + } + return n; + } +} + +static void +socket_fd_cb( int events, void *aux ) { - int num = -1; + conn_t *conn = (conn_t *)aux; + + if (events & POLLERR) { + error( "Unidentified socket error.\n" ); + socket_fail( conn ); + return; + } + + if (conn->state == SCK_CONNECTING) { + socket_connected( conn ); + return; + } + + if (events & POLLOUT) + conf_fd( conn->fd, POLLIN, 0 ); - if (ioctl( sock->fd, FIONREAD, &num ) < 0) - return -1; - if (num > 0) - return num; #ifdef HAVE_LIBSSL - if (sock->ssl) - return SSL_pending( sock->ssl ); + if (conn->state == SCK_STARTTLS) { + start_tls_p2( conn ); + return; + } + if (conn->ssl) { + if (do_queued_write( conn ) < 0) + return; + socket_fill( conn ); + return; + } #endif - return 0; + + if ((events & POLLOUT) && do_queued_write( conn ) < 0) + return; + if (events & POLLIN) + socket_fill( conn ); } #ifdef HAVE_LIBSSL