From 1ba0cd7b96de08b843c77e4b6385bf6d34bd2afa Mon Sep 17 00:00:00 2001 From: Oswald Buddenhagen Date: Tue, 31 May 2022 09:58:41 +0200 Subject: [PATCH] factor out sync_state.c & sync_p.h from sync.c while moving the code, localize some variables, and use C99 comments. --- src/Makefile.am | 4 +- src/sync.c | 644 +---------------------------------------------- src/sync_p.h | 93 +++++++ src/sync_state.c | 569 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 665 insertions(+), 645 deletions(-) create mode 100644 src/sync_p.h create mode 100644 src/sync_state.c diff --git a/src/Makefile.am b/src/Makefile.am index 5e3d5aa..e06005d 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -7,12 +7,12 @@ mbsync_SOURCES = \ driver.c drv_proxy.c \ drv_imap.c \ drv_maildir.c \ - sync.c \ + sync.c sync_state.c \ main.c noinst_HEADERS = \ common.h config.h socket.h \ driver.h \ - sync.h + sync.h sync_p.h mbsync_LDADD = $(DB_LIBS) $(SSL_LIBS) $(SOCK_LIBS) $(SASL_LIBS) $(Z_LIBS) $(KEYCHAIN_LIBS) drv_proxy.$(OBJEXT): drv_proxy.inc diff --git a/src/sync.c b/src/sync.c index b4f74af..2e69b88 100644 --- a/src/sync.c +++ b/src/sync.c @@ -5,16 +5,7 @@ * mbsync - mailbox synchronizer */ -#define DEBUG_FLAG DEBUG_SYNC - -#include "sync.h" - -#include -#include -#include -#include - -#define JOURNAL_VERSION "4" +#include "sync_p.h" channel_conf_t global_conf; channel_conf_t *channels; @@ -26,75 +17,6 @@ int new_total[2], new_done[2]; int flags_total[2], flags_done[2]; int trash_total[2], trash_done[2]; -const char *str_fn[] = { "far side", "near side" }, *str_hl[] = { "push", "pull" }; - - -static uchar -parse_flags( const char *buf ) -{ - uint i, d; - uchar flags; - - for (flags = i = d = 0; i < as(MsgFlags); i++) { - if (buf[d] == MsgFlags[i]) { - flags |= (1 << i); - d++; - } - } - return flags; -} - -// This is the (mostly) persistent status of the sync record. -// Most of these bits are actually mutually exclusive. It is a -// bitfield to allow for easy testing for multiple states. -#define S_EXPIRE (1<<0) // the entry is being expired (near side message removal scheduled) -#define S_EXPIRED (1<<1) // the entry is expired (near side message removal confirmed) -#define S_PENDING (1<<2) // the entry is new and awaits propagation (possibly a retry) -#define S_DUMMY(fn) (1<<(3+(fn))) // f/n message is only a placeholder -#define S_SKIPPED (1<<5) // pre-1.4 legacy: the entry was not propagated (message is too big) -#define S_DEAD (1<<7) // ephemeral: the entry was killed and should be ignored - -// Ephemeral working set. -#define W_NEXPIRE (1<<0) // temporary: new expiration state -#define W_DELETE (1<<1) // ephemeral: flags propagation is a deletion -#define W_DEL(fn) (1<<(2+(fn))) // ephemeral: f/n message would be subject to expunge -#define W_UPGRADE (1<<4) // ephemeral: upgrading placeholder, do not apply MaxSize -#define W_PURGE (1<<5) // ephemeral: placeholder is being nuked - -typedef struct sync_rec { - struct sync_rec *next; - /* string_list_t *keywords; */ - uint uid[2]; - message_t *msg[2]; - uchar status, wstate, flags, pflags, aflags[2], dflags[2]; - char tuid[TUIDL]; -} sync_rec_t; - -typedef struct { - int t[2]; - void (*cb)( int sts, void *aux ), *aux; - char *dname, *jname, *nname, *lname, *box_name[2]; - FILE *jfp, *nfp; - sync_rec_t *srecs, **srecadd; - channel_conf_t *chan; - store_t *ctx[2]; - driver_t *drv[2]; - const char *orig_name[2]; - message_t *msgs[2], *new_msgs[2]; - uint_array_alloc_t trashed_msgs[2]; - int state[2], lfd, ret, existing, replayed; - uint ref_count, nsrecs, opts[2]; - uint new_pending[2], flags_pending[2], trash_pending[2]; - uint maxuid[2]; // highest UID that was already propagated - uint oldmaxuid[2]; // highest UID that was already propagated before this run - uint uidval[2]; // UID validity value - uint newuidval[2]; // UID validity obtained from driver - uint finduid[2]; // TUID lookup makes sense only for UIDs >= this - uint maxxfuid; // highest expired UID on far side - uint oldmaxxfuid; // highest expired UID on far side before this run - uchar good_flags[2], bad_flags[2]; -} sync_vars_t; - static void sync_ref( sync_vars_t *svars ) { ++svars->ref_count; } static void sync_deref( sync_vars_t *svars ); static int check_cancel( sync_vars_t *svars ); @@ -139,124 +61,6 @@ static int check_cancel( sync_vars_t *svars ); #define ST_SENDING_NEW (1<<15) -static void -create_state( sync_vars_t *svars ) -{ - if (!(svars->nfp = fopen( svars->nname, "w" ))) { - sys_error( "Error: cannot create new sync state %s", svars->nname ); - exit( 1 ); - } -} - -static void ATTR_PRINTFLIKE(2, 3) -jFprintf( sync_vars_t *svars, const char *msg, ... ) -{ - va_list va; - - if (JLimit && !--JLimit) - exit( 101 ); - if (!svars->jfp) { - create_state( svars ); - if (!(svars->jfp = fopen( svars->jname, svars->replayed ? "a" : "w" ))) { - sys_error( "Error: cannot create journal %s", svars->jname ); - exit( 1 ); - } - setlinebuf( svars->jfp ); - if (!svars->replayed) - Fprintf( svars->jfp, JOURNAL_VERSION "\n" ); - } - va_start( va, msg ); - vFprintf( svars->jfp, msg, va ); - va_end( va ); - if (JLimit && !--JLimit) - exit( 100 ); -} - -#define JLOG_(log_fmt, log_args, dbg_fmt, ...) \ - do { \ - debug( "-> log: " log_fmt " (" dbg_fmt ")\n", __VA_ARGS__ ); \ - jFprintf( svars, log_fmt "\n", deparen(log_args) ); \ - } while (0) -#define JLOG3(log_fmt, log_args, dbg_fmt) \ - JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args)) -#define JLOG4(log_fmt, log_args, dbg_fmt, dbg_args) \ - JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args), deparen(dbg_args)) -#define JLOG_SEL(_1, _2, _3, _4, x, ...) x -#define JLOG(...) JLOG_SEL(__VA_ARGS__, JLOG4, JLOG3, NO_JLOG2, NO_JLOG1)(__VA_ARGS__) - -static void -assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid ) -{ - srec->uid[t] = uid; - if (uid == svars->maxuid[t] + 1) - svars->maxuid[t] = uid; - srec->status &= ~S_PENDING; - srec->wstate &= ~W_UPGRADE; - srec->tuid[0] = 0; -} - -#define ASSIGN_UID(srec, t, nuid, ...) \ - do { \ - JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], nuid), __VA_ARGS__ ); \ - assign_uid( svars, srec, t, nuid ); \ - } while (0) - -static void -assign_tuid( sync_vars_t *svars, sync_rec_t *srec ) -{ - for (uint i = 0; i < TUIDL; i++) { - uchar c = arc4_getbyte() & 0x3f; - srec->tuid[i] = (char)(c < 26 ? c + 'A' : c < 52 ? c + 'a' - 26 : - c < 62 ? c + '0' - 52 : c == 62 ? '+' : '/'); - } - JLOG( "# %u %u %." stringify(TUIDL) "s", (srec->uid[F], srec->uid[N], srec->tuid), "new TUID" ); -} - -static int -match_tuids( sync_vars_t *svars, int t, message_t *msgs ) -{ - sync_rec_t *srec; - message_t *tmsg, *ntmsg = NULL; - const char *diag; - int num_lost = 0; - - for (srec = svars->srecs; srec; srec = srec->next) { - if (srec->status & S_DEAD) - continue; - if (!srec->uid[t] && srec->tuid[0]) { - debug( "pair(%u,%u) TUID %." stringify(TUIDL) "s\n", srec->uid[F], srec->uid[N], srec->tuid ); - for (tmsg = ntmsg; tmsg; tmsg = tmsg->next) { - if (tmsg->status & M_DEAD) - continue; - if (tmsg->tuid[0] && !memcmp( tmsg->tuid, srec->tuid, TUIDL )) { - diag = (tmsg == ntmsg) ? "adjacently" : "after gap"; - goto mfound; - } - } - for (tmsg = msgs; tmsg != ntmsg; tmsg = tmsg->next) { - if (tmsg->status & M_DEAD) - continue; - if (tmsg->tuid[0] && !memcmp( tmsg->tuid, srec->tuid, TUIDL )) { - diag = "after reset"; - goto mfound; - } - } - JLOG( "& %u %u", (srec->uid[F], srec->uid[N]), "TUID lost" ); - // Note: status remains S_PENDING. - srec->tuid[0] = 0; - num_lost++; - continue; - mfound: - tmsg->srec = srec; - srec->msg[t] = tmsg; - ntmsg = tmsg->next; - ASSIGN_UID( srec, t, tmsg->uid, "TUID matched %s", diag ); - } - } - return num_lost; -} - - static uchar sanitize_flags( uchar tflags, sync_vars_t *svars, int t ) { @@ -627,452 +431,6 @@ check_ret( int sts, void *aux ) } \ INIT_SVARS(vars->aux) -static char * -clean_strdup( const char *s ) -{ - char *cs; - uint i; - - cs = nfstrdup( s ); - for (i = 0; cs[i]; i++) - if (cs[i] == '/') - cs[i] = '!'; - return cs; -} - - -static sync_rec_t * -upgrade_srec( sync_vars_t *svars, sync_rec_t *srec ) -{ - // Create an entry and append it to the current one. - sync_rec_t *nsrec = nfzalloc( sizeof(*nsrec) ); - nsrec->next = srec->next; - srec->next = nsrec; - if (svars->srecadd == &srec->next) - svars->srecadd = &nsrec->next; - // Move the placeholder to the new entry. - int t = (srec->status & S_DUMMY(F)) ? F : N; - nsrec->uid[t] = srec->uid[t]; - srec->uid[t] = 0; - if (srec->msg[t]) { // NULL during journal replay; is assigned later. - nsrec->msg[t] = srec->msg[t]; - nsrec->msg[t]->srec = nsrec; - srec->msg[t] = NULL; - } - // Mark the original entry for upgrade. - srec->status = (srec->status & ~(S_DUMMY(F)|S_DUMMY(N))) | S_PENDING; - srec->wstate |= W_UPGRADE; - // Mark the placeholder for nuking. - nsrec->wstate = W_PURGE; - nsrec->aflags[t] = F_DELETED; - return nsrec; -} - -static int -prepare_state( sync_vars_t *svars ) -{ - char *s, *cmname, *csname; - channel_conf_t *chan; - - chan = svars->chan; - if (!strcmp( chan->sync_state ? chan->sync_state : global_conf.sync_state, "*" )) { - const char *path = svars->drv[N]->get_box_path( svars->ctx[N] ); - if (!path) { - error( "Error: store '%s' does not support in-box sync state\n", chan->stores[N]->name ); - return 0; - } - nfasprintf( &svars->dname, "%s/." EXE "state", path ); - } else { - csname = clean_strdup( svars->box_name[N] ); - if (chan->sync_state) { - nfasprintf( &svars->dname, "%s%s", chan->sync_state, csname ); - } else { - char c = FieldDelimiter; - cmname = clean_strdup( svars->box_name[F] ); - nfasprintf( &svars->dname, "%s%c%s%c%s_%c%s%c%s", global_conf.sync_state, - c, chan->stores[F]->name, c, cmname, c, chan->stores[N]->name, c, csname ); - free( cmname ); - } - free( csname ); - if (!(s = strrchr( svars->dname, '/' ))) { - error( "Error: invalid SyncState location '%s'\n", svars->dname ); - return 0; - } - *s = 0; - if (mkdir( svars->dname, 0700 ) && errno != EEXIST) { - sys_error( "Error: cannot create SyncState directory '%s'", svars->dname ); - return 0; - } - *s = '/'; - } - nfasprintf( &svars->jname, "%s.journal", svars->dname ); - nfasprintf( &svars->nname, "%s.new", svars->dname ); - nfasprintf( &svars->lname, "%s.lock", svars->dname ); - return 1; -} - -static int -lock_state( sync_vars_t *svars ) -{ - struct flock lck; - - if (svars->lfd >= 0) - return 1; - memset( &lck, 0, sizeof(lck) ); -#if SEEK_SET != 0 - lck.l_whence = SEEK_SET; -#endif -#if F_WRLCK != 0 - lck.l_type = F_WRLCK; -#endif - if ((svars->lfd = open( svars->lname, O_WRONLY|O_CREAT, 0666 )) < 0) { - sys_error( "Error: cannot create lock file %s", svars->lname ); - return 0; - } - if (fcntl( svars->lfd, F_SETLK, &lck )) { - error( "Error: channel :%s:%s-:%s:%s is locked\n", - svars->chan->stores[F]->name, svars->orig_name[F], svars->chan->stores[N]->name, svars->orig_name[N] ); - close( svars->lfd ); - svars->lfd = -1; - return 0; - } - return 1; -} - -static void -save_state( sync_vars_t *svars ) -{ - sync_rec_t *srec; - char fbuf[16]; /* enlarge when support for keywords is added */ - - // If no change was made, the state is also unmodified. - if (!svars->jfp && !svars->replayed) - return; - - if (!svars->nfp) - create_state( svars ); - Fprintf( svars->nfp, - "FarUidValidity %u\nNearUidValidity %u\nMaxPulledUid %u\nMaxPushedUid %u\n", - svars->uidval[F], svars->uidval[N], svars->maxuid[F], svars->maxuid[N] ); - if (svars->maxxfuid) - Fprintf( svars->nfp, "MaxExpiredFarUid %u\n", svars->maxxfuid ); - Fprintf( svars->nfp, "\n" ); - for (srec = svars->srecs; srec; srec = srec->next) { - if (srec->status & S_DEAD) - continue; - make_flags( srec->flags, fbuf ); - Fprintf( svars->nfp, "%u %u %s%s%s\n", srec->uid[F], srec->uid[N], - (srec->status & S_DUMMY(F)) ? "<" : (srec->status & S_DUMMY(N)) ? ">" : "", - (srec->status & S_SKIPPED) ? "^" : (srec->status & S_EXPIRED) ? "~" : "", fbuf ); - } - - Fclose( svars->nfp, 1 ); - if (svars->jfp) - Fclose( svars->jfp, 0 ); - if (!(DFlags & KEEPJOURNAL)) { - /* order is important! */ - if (rename( svars->nname, svars->dname )) - warn( "Warning: cannot commit sync state %s\n", svars->dname ); - else if (unlink( svars->jname )) - warn( "Warning: cannot delete journal %s\n", svars->jname ); - } -} - -static int -load_state( sync_vars_t *svars ) -{ - sync_rec_t *srec, *nsrec; - char *s; - FILE *jfp; - uint ll; - uint maxxnuid = 0; - char c; - struct stat st; - char fbuf[16]; /* enlarge when support for keywords is added */ - char buf[128], buf1[64], buf2[64]; - - if ((jfp = fopen( svars->dname, "r" ))) { - if (!lock_state( svars )) - goto jbail; - debug( "reading sync state %s ...\n", svars->dname ); - int line = 0; - while (fgets( buf, sizeof(buf), jfp )) { - line++; - if (!(ll = strlen( buf )) || buf[ll - 1] != '\n') { - error( "Error: incomplete sync state header entry at %s:%d\n", svars->dname, line ); - jbail: - fclose( jfp ); - return 0; - } - if (ll == 1) - goto gothdr; - if (line == 1 && isdigit( buf[0] )) { // Pre-1.1 legacy - if (sscanf( buf, "%63s %63s", buf1, buf2 ) != 2 || - sscanf( buf1, "%u:%u", &svars->uidval[F], &svars->maxuid[F] ) < 2 || - sscanf( buf2, "%u:%u:%u", &svars->uidval[N], &maxxnuid, &svars->maxuid[N] ) < 3) { - error( "Error: invalid sync state header in %s\n", svars->dname ); - goto jbail; - } - goto gothdr; - } - uint uid; - if (sscanf( buf, "%63s %u", buf1, &uid ) != 2) { - error( "Error: malformed sync state header entry at %s:%d\n", svars->dname, line ); - goto jbail; - } - if (!strcmp( buf1, "FarUidValidity" ) || !strcmp( buf1, "MasterUidValidity" ) /* Pre-1.4 legacy */) { - svars->uidval[F] = uid; - } else if (!strcmp( buf1, "NearUidValidity" ) || !strcmp( buf1, "SlaveUidValidity" ) /* Pre-1.4 legacy */) { - svars->uidval[N] = uid; - } else if (!strcmp( buf1, "MaxPulledUid" )) { - svars->maxuid[F] = uid; - } else if (!strcmp( buf1, "MaxPushedUid" )) { - svars->maxuid[N] = uid; - } else if (!strcmp( buf1, "MaxExpiredFarUid" ) || !strcmp( buf1, "MaxExpiredMasterUid" ) /* Pre-1.4 legacy */) { - svars->maxxfuid = uid; - } else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) { // Pre-1.3 legacy - maxxnuid = uid; - } else { - error( "Error: unrecognized sync state header entry at %s:%d\n", svars->dname, line ); - goto jbail; - } - } - error( "Error: unterminated sync state header in %s\n", svars->dname ); - goto jbail; - gothdr: - while (fgets( buf, sizeof(buf), jfp )) { - line++; - if (!(ll = strlen( buf )) || buf[--ll] != '\n') { - error( "Error: incomplete sync state entry at %s:%d\n", svars->dname, line ); - goto jbail; - } - buf[ll] = 0; - fbuf[0] = 0; - uint t1, t2; - if (sscanf( buf, "%u %u %15s", &t1, &t2, fbuf ) < 2) { - error( "Error: invalid sync state entry at %s:%d\n", svars->dname, line ); - goto jbail; - } - srec = nfzalloc( sizeof(*srec) ); - srec->uid[F] = t1; - srec->uid[N] = t2; - s = fbuf; - if (*s == '<') { - s++; - srec->status = S_DUMMY(F); - } else if (*s == '>') { - s++; - srec->status = S_DUMMY(N); - } - if (*s == '^') { // Pre-1.4 legacy - s++; - srec->status = S_SKIPPED; - } else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) { - s++; - srec->status = S_EXPIRE | S_EXPIRED; - } else if (srec->uid[F] == (uint)-1) { // Pre-1.3 legacy - srec->uid[F] = 0; - srec->status = S_SKIPPED; - } else if (srec->uid[N] == (uint)-1) { - srec->uid[N] = 0; - srec->status = S_SKIPPED; - } - srec->flags = parse_flags( s ); - debug( " entry (%u,%u,%u,%s%s)\n", srec->uid[F], srec->uid[N], srec->flags, - (srec->status & S_SKIPPED) ? "SKIP" : (srec->status & S_EXPIRED) ? "XPIRE" : "", - (srec->status & S_DUMMY(F)) ? ",F-DUMMY" : (srec->status & S_DUMMY(N)) ? ",N-DUMMY" : "" ); - *svars->srecadd = srec; - svars->srecadd = &srec->next; - svars->nsrecs++; - } - fclose( jfp ); - svars->existing = 1; - } else { - if (errno != ENOENT) { - sys_error( "Error: cannot read sync state %s", svars->dname ); - return 0; - } - svars->existing = 0; - } - - // This is legacy support for pre-1.3 sync states. - if (maxxnuid) { - uint minwuid = UINT_MAX; - for (srec = svars->srecs; srec; srec = srec->next) { - if ((srec->status & (S_DEAD | S_SKIPPED | S_PENDING)) || !srec->uid[F]) - continue; - if (srec->status & S_EXPIRED) { - if (!srec->uid[N]) { - // The expired message was already gone. - continue; - } - // The expired message was not expunged yet, so re-examine it. - // This will happen en masse, so just extend the bulk fetch. - } else { - if (srec->uid[N] && maxxnuid >= srec->uid[N]) { - // The non-expired message is in the generally expired range, - // so don't make it contribute to the bulk fetch. - continue; - } - // Usual non-expired message. - } - if (minwuid > srec->uid[F]) - minwuid = srec->uid[F]; - } - svars->maxxfuid = minwuid - 1; - } - - int line = 0; - if ((jfp = fopen( svars->jname, "r" ))) { - if (!lock_state( svars )) - goto jbail; - if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp )) { - debug( "recovering journal ...\n" ); - if (!(ll = strlen( buf )) || buf[--ll] != '\n') { - error( "Error: incomplete journal header in %s\n", svars->jname ); - goto jbail; - } - buf[ll] = 0; - if (!equals( buf, (int)ll, JOURNAL_VERSION, strlen(JOURNAL_VERSION) )) { - error( "Error: incompatible journal version " - "(got %s, expected " JOURNAL_VERSION ")\n", buf ); - goto jbail; - } - srec = NULL; - line = 1; - while (fgets( buf, sizeof(buf), jfp )) { - line++; - if (!(ll = strlen( buf )) || buf[--ll] != '\n') { - error( "Error: incomplete journal entry at %s:%d\n", svars->jname, line ); - goto jbail; - } - buf[ll] = 0; - int tn; - uint t1, t2, t3, t4; - if ((c = buf[0]) == '#' ? - (tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) : - c == '!' ? - (sscanf( buf + 2, "%u", &t1 ) != 1) : - c == 'N' || c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '_' || c == '|' ? - (sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) : - c != '^' ? - (sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3) : - (sscanf( buf + 2, "%u %u %u %u", &t1, &t2, &t3, &t4 ) != 4)) - { - error( "Error: malformed journal entry at %s:%d\n", svars->jname, line ); - goto jbail; - } - if (c == 'N') { - svars->maxuid[t1] = t2; - } else if (c == 'F') { - svars->finduid[t1] = t2; - } else if (c == 'T') { - *uint_array_append( &svars->trashed_msgs[t1] ) = t2; - } else if (c == '!') { - svars->maxxfuid = t1; - } else if (c == '|') { - svars->uidval[F] = t1; - svars->uidval[N] = t2; - } else if (c == '+') { - srec = nfzalloc( sizeof(*srec) ); - srec->uid[F] = t1; - srec->uid[N] = t2; - debug( " new entry(%u,%u)\n", t1, t2 ); - srec->status = S_PENDING; - *svars->srecadd = srec; - svars->srecadd = &srec->next; - svars->nsrecs++; - } else { - for (nsrec = srec; srec; srec = srec->next) - if (srec->uid[F] == t1 && srec->uid[N] == t2) - goto syncfnd; - for (srec = svars->srecs; srec != nsrec; srec = srec->next) - if (srec->uid[F] == t1 && srec->uid[N] == t2) - goto syncfnd; - error( "Error: journal entry at %s:%d refers to non-existing sync state entry\n", svars->jname, line ); - goto jbail; - syncfnd: - debugn( " entry(%u,%u,%u) ", srec->uid[F], srec->uid[N], srec->flags ); - switch (c) { - case '-': - debug( "killed\n" ); - srec->status = S_DEAD; - break; - case '=': - debug( "aborted\n" ); - if (svars->maxxfuid < srec->uid[F]) - svars->maxxfuid = srec->uid[F]; - srec->status = S_DEAD; - break; - case '#': - memcpy( srec->tuid, buf + tn + 2, TUIDL ); - debug( "TUID now %." stringify(TUIDL) "s\n", srec->tuid ); - break; - case '&': - debug( "TUID %." stringify(TUIDL) "s lost\n", srec->tuid ); - srec->tuid[0] = 0; - break; - case '<': - debug( "far side now %u\n", t3 ); - assign_uid( svars, srec, F, t3 ); - break; - case '>': - debug( "near side now %u\n", t3 ); - assign_uid( svars, srec, N, t3 ); - break; - case '*': - debug( "flags now %u\n", t3 ); - srec->flags = (uchar)t3; - srec->aflags[F] = srec->aflags[N] = 0; // Clear F_DELETED from purge - srec->wstate &= ~W_PURGE; - break; - case '~': - debug( "status now %#x\n", t3 ); - srec->status = (uchar)t3; - break; - case '_': - debug( "has placeholder now\n" ); - srec->status = S_PENDING; // Pre-1.4 legacy only - srec->status |= !srec->uid[F] ? S_DUMMY(F) : S_DUMMY(N); - break; - case '^': - debug( "is being upgraded, flags %u, srec flags %u\n", t3, t4 ); - srec->pflags = (uchar)t3; - srec->flags = (uchar)t4; - srec = upgrade_srec( svars, srec ); - break; - default: - error( "Error: unrecognized journal entry at %s:%d\n", svars->jname, line ); - goto jbail; - } - } - } - } - fclose( jfp ); - sort_uint_array( svars->trashed_msgs[F].array ); - sort_uint_array( svars->trashed_msgs[N].array ); - } else { - if (errno != ENOENT) { - sys_error( "Error: cannot read journal %s", svars->jname ); - return 0; - } - } - svars->replayed = line; - - return 1; -} - -static void -delete_state( sync_vars_t *svars ) -{ - unlink( svars->nname ); - unlink( svars->jname ); - if (unlink( svars->dname ) || unlink( svars->lname )) { - sys_error( "Error: channel %s: sync state cannot be deleted", svars->chan->name ); - svars->ret = SYNC_FAIL; - } -} - static void box_confirmed( int sts, uint uidvalidity, void *aux ); static void box_confirmed2( sync_vars_t *svars, int t ); static void box_deleted( int sts, void *aux ); diff --git a/src/sync_p.h b/src/sync_p.h new file mode 100644 index 0000000..07eefcb --- /dev/null +++ b/src/sync_p.h @@ -0,0 +1,93 @@ +// SPDX-FileCopyrightText: 2002-2022 Oswald Buddenhagen +// SPDX-License-Identifier: GPL-2.0-or-later WITH LicenseRef-isync-GPL-exception +// +// mbsync - mailbox synchronizer +// + +#define DEBUG_FLAG DEBUG_SYNC + +#include "sync.h" + +// This is the (mostly) persistent status of the sync record. +// Most of these bits are actually mutually exclusive. It is a +// bitfield to allow for easy testing for multiple states. +#define S_EXPIRE (1<<0) // the entry is being expired (near side message removal scheduled) +#define S_EXPIRED (1<<1) // the entry is expired (near side message removal confirmed) +#define S_PENDING (1<<2) // the entry is new and awaits propagation (possibly a retry) +#define S_DUMMY(fn) (1<<(3+(fn))) // f/n message is only a placeholder +#define S_SKIPPED (1<<5) // pre-1.4 legacy: the entry was not propagated (message is too big) +#define S_DEAD (1<<7) // ephemeral: the entry was killed and should be ignored + +// Ephemeral working set. +#define W_NEXPIRE (1<<0) // temporary: new expiration state +#define W_DELETE (1<<1) // ephemeral: flags propagation is a deletion +#define W_DEL(fn) (1<<(2+(fn))) // ephemeral: f/n message would be subject to expunge +#define W_UPGRADE (1<<4) // ephemeral: upgrading placeholder, do not apply MaxSize +#define W_PURGE (1<<5) // ephemeral: placeholder is being nuked + +typedef struct sync_rec { + struct sync_rec *next; + /* string_list_t *keywords; */ + uint uid[2]; + message_t *msg[2]; + uchar status, wstate, flags, pflags, aflags[2], dflags[2]; + char tuid[TUIDL]; +} sync_rec_t; + +typedef struct { + int t[2]; + void (*cb)( int sts, void *aux ), *aux; + char *dname, *jname, *nname, *lname, *box_name[2]; + FILE *jfp, *nfp; + sync_rec_t *srecs, **srecadd; + channel_conf_t *chan; + store_t *ctx[2]; + driver_t *drv[2]; + const char *orig_name[2]; + message_t *msgs[2], *new_msgs[2]; + uint_array_alloc_t trashed_msgs[2]; + int state[2], lfd, ret, existing, replayed; + uint ref_count, nsrecs, opts[2]; + uint new_pending[2], flags_pending[2], trash_pending[2]; + uint maxuid[2]; // highest UID that was already propagated + uint oldmaxuid[2]; // highest UID that was already propagated before this run + uint uidval[2]; // UID validity value + uint newuidval[2]; // UID validity obtained from driver + uint finduid[2]; // TUID lookup makes sense only for UIDs >= this + uint maxxfuid; // highest expired UID on far side + uint oldmaxxfuid; // highest expired UID on far side before this run + uchar good_flags[2], bad_flags[2]; +} sync_vars_t; + +int prepare_state( sync_vars_t *svars ); +int lock_state( sync_vars_t *svars ); +int load_state( sync_vars_t *svars ); +void save_state( sync_vars_t *svars ); +void delete_state( sync_vars_t *svars ); + +void ATTR_PRINTFLIKE(2, 3) jFprintf( sync_vars_t *svars, const char *msg, ... ); + +#define JLOG_(log_fmt, log_args, dbg_fmt, ...) \ + do { \ + debug( "-> log: " log_fmt " (" dbg_fmt ")\n", __VA_ARGS__ ); \ + jFprintf( svars, log_fmt "\n", deparen(log_args) ); \ + } while (0) +#define JLOG3(log_fmt, log_args, dbg_fmt) \ + JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args)) +#define JLOG4(log_fmt, log_args, dbg_fmt, dbg_args) \ + JLOG_(log_fmt, log_args, dbg_fmt, deparen(log_args), deparen(dbg_args)) +#define JLOG_SEL(_1, _2, _3, _4, x, ...) x +#define JLOG(...) JLOG_SEL(__VA_ARGS__, JLOG4, JLOG3, NO_JLOG2, NO_JLOG1)(__VA_ARGS__) + +void assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid ); + +#define ASSIGN_UID(srec, t, nuid, ...) \ + do { \ + JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], nuid), __VA_ARGS__ ); \ + assign_uid( svars, srec, t, nuid ); \ + } while (0) + +void assign_tuid( sync_vars_t *svars, sync_rec_t *srec ); +int match_tuids( sync_vars_t *svars, int t, message_t *msgs ); + +sync_rec_t *upgrade_srec( sync_vars_t *svars, sync_rec_t *srec ); diff --git a/src/sync_state.c b/src/sync_state.c new file mode 100644 index 0000000..c6761e9 --- /dev/null +++ b/src/sync_state.c @@ -0,0 +1,569 @@ +// SPDX-FileCopyrightText: 2004-2022 Oswald Buddenhagen +// SPDX-License-Identifier: GPL-2.0-or-later WITH LicenseRef-isync-GPL-exception +// +// mbsync - mailbox synchronizer +// + +#define DEBUG_FLAG DEBUG_SYNC + +#include "sync_p.h" + +#include +#include +#include +#include + +#define JOURNAL_VERSION "4" + +const char *str_fn[] = { "far side", "near side" }, *str_hl[] = { "push", "pull" }; + +static char * +clean_strdup( const char *s ) +{ + char *cs = nfstrdup( s ); + for (uint i = 0; cs[i]; i++) + if (cs[i] == '/') + cs[i] = '!'; + return cs; +} + +int +prepare_state( sync_vars_t *svars ) +{ + channel_conf_t *chan = svars->chan; + if (!strcmp( chan->sync_state ? chan->sync_state : global_conf.sync_state, "*" )) { + const char *path = svars->drv[N]->get_box_path( svars->ctx[N] ); + if (!path) { + error( "Error: store '%s' does not support in-box sync state\n", chan->stores[N]->name ); + return 0; + } + nfasprintf( &svars->dname, "%s/." EXE "state", path ); + } else { + char *cnname = clean_strdup( svars->box_name[N] ); + if (chan->sync_state) { + nfasprintf( &svars->dname, "%s%s", chan->sync_state, cnname ); + } else { + char c = FieldDelimiter; + char *cfname = clean_strdup( svars->box_name[F] ); + nfasprintf( &svars->dname, "%s%c%s%c%s_%c%s%c%s", global_conf.sync_state, + c, chan->stores[F]->name, c, cfname, c, chan->stores[N]->name, c, cnname ); + free( cfname ); + } + free( cnname ); + char *s; + if (!(s = strrchr( svars->dname, '/' ))) { + error( "Error: invalid SyncState location '%s'\n", svars->dname ); + return 0; + } + // Note that this may be shorter than the configuration value, + // as that may contain a filename prefix. + *s = 0; + if (mkdir( svars->dname, 0700 ) && errno != EEXIST) { + sys_error( "Error: cannot create SyncState directory '%s'", svars->dname ); + return 0; + } + *s = '/'; + } + nfasprintf( &svars->jname, "%s.journal", svars->dname ); + nfasprintf( &svars->nname, "%s.new", svars->dname ); + nfasprintf( &svars->lname, "%s.lock", svars->dname ); + return 1; +} + +int +lock_state( sync_vars_t *svars ) +{ + struct flock lck; + + if (svars->lfd >= 0) + return 1; + memset( &lck, 0, sizeof(lck) ); +#if SEEK_SET != 0 + lck.l_whence = SEEK_SET; +#endif +#if F_WRLCK != 0 + lck.l_type = F_WRLCK; +#endif + if ((svars->lfd = open( svars->lname, O_WRONLY | O_CREAT, 0666 )) < 0) { + sys_error( "Error: cannot create lock file %s", svars->lname ); + return 0; + } + if (fcntl( svars->lfd, F_SETLK, &lck )) { + error( "Error: channel :%s:%s-:%s:%s is locked\n", + svars->chan->stores[F]->name, svars->orig_name[F], svars->chan->stores[N]->name, svars->orig_name[N] ); + close( svars->lfd ); + svars->lfd = -1; + return 0; + } + return 1; +} + +static uchar +parse_flags( const char *buf ) +{ + uchar flags = 0; + for (uint i = 0, d = 0; i < as(MsgFlags); i++) { + if (buf[d] == MsgFlags[i]) { + flags |= (1 << i); + d++; + } + } + return flags; +} + +int +load_state( sync_vars_t *svars ) +{ + sync_rec_t *srec, *nsrec; + FILE *jfp; + uint ll; + uint maxxnuid = 0; + char fbuf[16]; // enlarge when support for keywords is added + char buf[128], buf1[64], buf2[64]; + + if ((jfp = fopen( svars->dname, "r" ))) { + if (!lock_state( svars )) + goto jbail; + debug( "reading sync state %s ...\n", svars->dname ); + int line = 0; + while (fgets( buf, sizeof(buf), jfp )) { + line++; + if (!(ll = strlen( buf )) || buf[ll - 1] != '\n') { + error( "Error: incomplete sync state header entry at %s:%d\n", svars->dname, line ); + jbail: + fclose( jfp ); + return 0; + } + if (ll == 1) + goto gothdr; + if (line == 1 && isdigit( buf[0] )) { // Pre-1.1 legacy + if (sscanf( buf, "%63s %63s", buf1, buf2 ) != 2 || + sscanf( buf1, "%u:%u", &svars->uidval[F], &svars->maxuid[F] ) < 2 || + sscanf( buf2, "%u:%u:%u", &svars->uidval[N], &maxxnuid, &svars->maxuid[N] ) < 3) { + error( "Error: invalid sync state header in %s\n", svars->dname ); + goto jbail; + } + goto gothdr; + } + uint uid; + if (sscanf( buf, "%63s %u", buf1, &uid ) != 2) { + error( "Error: malformed sync state header entry at %s:%d\n", svars->dname, line ); + goto jbail; + } + if (!strcmp( buf1, "FarUidValidity" ) || !strcmp( buf1, "MasterUidValidity" ) /* Pre-1.4 legacy */) { + svars->uidval[F] = uid; + } else if (!strcmp( buf1, "NearUidValidity" ) || !strcmp( buf1, "SlaveUidValidity" ) /* Pre-1.4 legacy */) { + svars->uidval[N] = uid; + } else if (!strcmp( buf1, "MaxPulledUid" )) { + svars->maxuid[F] = uid; + } else if (!strcmp( buf1, "MaxPushedUid" )) { + svars->maxuid[N] = uid; + } else if (!strcmp( buf1, "MaxExpiredFarUid" ) || !strcmp( buf1, "MaxExpiredMasterUid" ) /* Pre-1.4 legacy */) { + svars->maxxfuid = uid; + } else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) { // Pre-1.3 legacy + maxxnuid = uid; + } else { + error( "Error: unrecognized sync state header entry at %s:%d\n", svars->dname, line ); + goto jbail; + } + } + error( "Error: unterminated sync state header in %s\n", svars->dname ); + goto jbail; + gothdr: + while (fgets( buf, sizeof(buf), jfp )) { + line++; + if (!(ll = strlen( buf )) || buf[--ll] != '\n') { + error( "Error: incomplete sync state entry at %s:%d\n", svars->dname, line ); + goto jbail; + } + buf[ll] = 0; + fbuf[0] = 0; + uint t1, t2; + if (sscanf( buf, "%u %u %15s", &t1, &t2, fbuf ) < 2) { + error( "Error: invalid sync state entry at %s:%d\n", svars->dname, line ); + goto jbail; + } + srec = nfzalloc( sizeof(*srec) ); + srec->uid[F] = t1; + srec->uid[N] = t2; + char *s = fbuf; + if (*s == '<') { + s++; + srec->status = S_DUMMY(F); + } else if (*s == '>') { + s++; + srec->status = S_DUMMY(N); + } + if (*s == '^') { // Pre-1.4 legacy + s++; + srec->status = S_SKIPPED; + } else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) { + s++; + srec->status = S_EXPIRE | S_EXPIRED; + } else if (srec->uid[F] == (uint)-1) { // Pre-1.3 legacy + srec->uid[F] = 0; + srec->status = S_SKIPPED; + } else if (srec->uid[N] == (uint)-1) { + srec->uid[N] = 0; + srec->status = S_SKIPPED; + } + srec->flags = parse_flags( s ); + debug( " entry (%u,%u,%u,%s%s)\n", srec->uid[F], srec->uid[N], srec->flags, + (srec->status & S_SKIPPED) ? "SKIP" : (srec->status & S_EXPIRED) ? "XPIRE" : "", + (srec->status & S_DUMMY(F)) ? ",F-DUMMY" : (srec->status & S_DUMMY(N)) ? ",N-DUMMY" : "" ); + *svars->srecadd = srec; + svars->srecadd = &srec->next; + svars->nsrecs++; + } + fclose( jfp ); + svars->existing = 1; + } else { + if (errno != ENOENT) { + sys_error( "Error: cannot read sync state %s", svars->dname ); + return 0; + } + svars->existing = 0; + } + + // This is legacy support for pre-1.3 sync states. + if (maxxnuid) { + uint minwuid = UINT_MAX; + for (srec = svars->srecs; srec; srec = srec->next) { + if ((srec->status & (S_DEAD | S_SKIPPED | S_PENDING)) || !srec->uid[F]) + continue; + if (srec->status & S_EXPIRED) { + if (!srec->uid[N]) { + // The expired message was already gone. + continue; + } + // The expired message was not expunged yet, so re-examine it. + // This will happen en masse, so just extend the bulk fetch. + } else { + if (srec->uid[N] && maxxnuid >= srec->uid[N]) { + // The non-expired message is in the generally expired range, + // so don't make it contribute to the bulk fetch. + continue; + } + // Usual non-expired message. + } + if (minwuid > srec->uid[F]) + minwuid = srec->uid[F]; + } + svars->maxxfuid = minwuid - 1; + } + + int line = 0; + if ((jfp = fopen( svars->jname, "r" ))) { + if (!lock_state( svars )) + goto jbail; + struct stat st; + if (!stat( svars->nname, &st ) && fgets( buf, sizeof(buf), jfp )) { + debug( "recovering journal ...\n" ); + if (!(ll = strlen( buf )) || buf[--ll] != '\n') { + error( "Error: incomplete journal header in %s\n", svars->jname ); + goto jbail; + } + buf[ll] = 0; + if (!equals( buf, (int)ll, JOURNAL_VERSION, strlen(JOURNAL_VERSION) )) { + error( "Error: incompatible journal version" + " (got %s, expected " JOURNAL_VERSION ")\n", buf ); + goto jbail; + } + srec = NULL; + line = 1; + while (fgets( buf, sizeof(buf), jfp )) { + line++; + if (!(ll = strlen( buf )) || buf[--ll] != '\n') { + error( "Error: incomplete journal entry at %s:%d\n", svars->jname, line ); + goto jbail; + } + buf[ll] = 0; + char c; + int tn; + uint t1, t2, t3, t4; + if ((c = buf[0]) == '#' ? + (tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) : + c == '!' ? + (sscanf( buf + 2, "%u", &t1 ) != 1) : + c == 'N' || c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '_' || c == '|' ? + (sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) : + c != '^' ? + (sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3) : + (sscanf( buf + 2, "%u %u %u %u", &t1, &t2, &t3, &t4 ) != 4)) + { + error( "Error: malformed journal entry at %s:%d\n", svars->jname, line ); + goto jbail; + } + if (c == 'N') { + svars->maxuid[t1] = t2; + } else if (c == 'F') { + svars->finduid[t1] = t2; + } else if (c == 'T') { + *uint_array_append( &svars->trashed_msgs[t1] ) = t2; + } else if (c == '!') { + svars->maxxfuid = t1; + } else if (c == '|') { + svars->uidval[F] = t1; + svars->uidval[N] = t2; + } else if (c == '+') { + srec = nfzalloc( sizeof(*srec) ); + srec->uid[F] = t1; + srec->uid[N] = t2; + debug( " new entry(%u,%u)\n", t1, t2 ); + srec->status = S_PENDING; + *svars->srecadd = srec; + svars->srecadd = &srec->next; + svars->nsrecs++; + } else { + for (nsrec = srec; srec; srec = srec->next) + if (srec->uid[F] == t1 && srec->uid[N] == t2) + goto syncfnd; + for (srec = svars->srecs; srec != nsrec; srec = srec->next) + if (srec->uid[F] == t1 && srec->uid[N] == t2) + goto syncfnd; + error( "Error: journal entry at %s:%d refers to non-existing sync state entry\n", svars->jname, line ); + goto jbail; + syncfnd: + debugn( " entry(%u,%u,%u) ", srec->uid[F], srec->uid[N], srec->flags ); + switch (c) { + case '-': + debug( "killed\n" ); + srec->status = S_DEAD; + break; + case '=': + debug( "aborted\n" ); + if (svars->maxxfuid < srec->uid[F]) + svars->maxxfuid = srec->uid[F]; + srec->status = S_DEAD; + break; + case '#': + memcpy( srec->tuid, buf + tn + 2, TUIDL ); + debug( "TUID now %." stringify(TUIDL) "s\n", srec->tuid ); + break; + case '&': + debug( "TUID %." stringify(TUIDL) "s lost\n", srec->tuid ); + srec->tuid[0] = 0; + break; + case '<': + debug( "far side now %u\n", t3 ); + assign_uid( svars, srec, F, t3 ); + break; + case '>': + debug( "near side now %u\n", t3 ); + assign_uid( svars, srec, N, t3 ); + break; + case '*': + debug( "flags now %u\n", t3 ); + srec->flags = (uchar)t3; + srec->aflags[F] = srec->aflags[N] = 0; // Clear F_DELETED from purge + srec->wstate &= ~W_PURGE; + break; + case '~': + debug( "status now %#x\n", t3 ); + srec->status = (uchar)t3; + break; + case '_': + debug( "has placeholder now\n" ); + srec->status = S_PENDING; // Pre-1.4 legacy only + srec->status |= !srec->uid[F] ? S_DUMMY(F) : S_DUMMY(N); + break; + case '^': + debug( "is being upgraded, flags %u, srec flags %u\n", t3, t4 ); + srec->pflags = (uchar)t3; + srec->flags = (uchar)t4; + srec = upgrade_srec( svars, srec ); + break; + default: + error( "Error: unrecognized journal entry at %s:%d\n", svars->jname, line ); + goto jbail; + } + } + } + } + fclose( jfp ); + sort_uint_array( svars->trashed_msgs[F].array ); + sort_uint_array( svars->trashed_msgs[N].array ); + } else { + if (errno != ENOENT) { + sys_error( "Error: cannot read journal %s", svars->jname ); + return 0; + } + } + svars->replayed = line; + + return 1; +} + +static void +create_state( sync_vars_t *svars ) +{ + if (!(svars->nfp = fopen( svars->nname, "w" ))) { + sys_error( "Error: cannot create new sync state %s", svars->nname ); + exit( 1 ); + } +} + +void +jFprintf( sync_vars_t *svars, const char *msg, ... ) +{ + va_list va; + + if (JLimit && !--JLimit) + exit( 101 ); + if (!svars->jfp) { + create_state( svars ); + if (!(svars->jfp = fopen( svars->jname, svars->replayed ? "a" : "w" ))) { + sys_error( "Error: cannot create journal %s", svars->jname ); + exit( 1 ); + } + setlinebuf( svars->jfp ); + if (!svars->replayed) + Fprintf( svars->jfp, JOURNAL_VERSION "\n" ); + } + va_start( va, msg ); + vFprintf( svars->jfp, msg, va ); + va_end( va ); + if (JLimit && !--JLimit) + exit( 100 ); +} + +void +save_state( sync_vars_t *svars ) +{ + // If no change was made, the state is also unmodified. + if (!svars->jfp && !svars->replayed) + return; + + if (!svars->nfp) + create_state( svars ); + Fprintf( svars->nfp, + "FarUidValidity %u\nNearUidValidity %u\nMaxPulledUid %u\nMaxPushedUid %u\n", + svars->uidval[F], svars->uidval[N], svars->maxuid[F], svars->maxuid[N] ); + if (svars->maxxfuid) + Fprintf( svars->nfp, "MaxExpiredFarUid %u\n", svars->maxxfuid ); + Fprintf( svars->nfp, "\n" ); + for (sync_rec_t *srec = svars->srecs; srec; srec = srec->next) { + if (srec->status & S_DEAD) + continue; + char fbuf[16]; // enlarge when support for keywords is added + make_flags( srec->flags, fbuf ); + Fprintf( svars->nfp, "%u %u %s%s%s\n", srec->uid[F], srec->uid[N], + (srec->status & S_DUMMY(F)) ? "<" : (srec->status & S_DUMMY(N)) ? ">" : "", + (srec->status & S_SKIPPED) ? "^" : (srec->status & S_EXPIRED) ? "~" : "", fbuf ); + } + + Fclose( svars->nfp, 1 ); + if (svars->jfp) + Fclose( svars->jfp, 0 ); + if (!(DFlags & KEEPJOURNAL)) { + // Order is important! + if (rename( svars->nname, svars->dname )) + warn( "Warning: cannot commit sync state %s\n", svars->dname ); + else if (unlink( svars->jname )) + warn( "Warning: cannot delete journal %s\n", svars->jname ); + } +} + +void +delete_state( sync_vars_t *svars ) +{ + unlink( svars->nname ); + unlink( svars->jname ); + if (unlink( svars->dname ) || unlink( svars->lname )) { + sys_error( "Error: channel %s: sync state cannot be deleted", svars->chan->name ); + svars->ret = SYNC_FAIL; + } +} + + +void +assign_uid( sync_vars_t *svars, sync_rec_t *srec, int t, uint uid ) +{ + srec->uid[t] = uid; + if (uid == svars->maxuid[t] + 1) + svars->maxuid[t] = uid; + srec->status &= ~S_PENDING; + srec->wstate &= ~W_UPGRADE; + srec->tuid[0] = 0; +} + +void +assign_tuid( sync_vars_t *svars, sync_rec_t *srec ) +{ + for (uint i = 0; i < TUIDL; i++) { + uchar c = arc4_getbyte() & 0x3f; + srec->tuid[i] = (char)(c < 26 ? c + 'A' : c < 52 ? c + 'a' - 26 : + c < 62 ? c + '0' - 52 : c == 62 ? '+' : '/'); + } + JLOG( "# %u %u %." stringify(TUIDL) "s", (srec->uid[F], srec->uid[N], srec->tuid), "new TUID" ); +} + +int +match_tuids( sync_vars_t *svars, int t, message_t *msgs ) +{ + message_t *tmsg, *ntmsg = NULL; + const char *diag; + int num_lost = 0; + + for (sync_rec_t *srec = svars->srecs; srec; srec = srec->next) { + if (srec->status & S_DEAD) + continue; + if (!srec->uid[t] && srec->tuid[0]) { + debug( "pair(%u,%u) TUID %." stringify(TUIDL) "s\n", srec->uid[F], srec->uid[N], srec->tuid ); + for (tmsg = ntmsg; tmsg; tmsg = tmsg->next) { + if (tmsg->status & M_DEAD) + continue; + if (tmsg->tuid[0] && !memcmp( tmsg->tuid, srec->tuid, TUIDL )) { + diag = (tmsg == ntmsg) ? "adjacently" : "after gap"; + goto mfound; + } + } + for (tmsg = msgs; tmsg != ntmsg; tmsg = tmsg->next) { + if (tmsg->status & M_DEAD) + continue; + if (tmsg->tuid[0] && !memcmp( tmsg->tuid, srec->tuid, TUIDL )) { + diag = "after reset"; + goto mfound; + } + } + JLOG( "& %u %u", (srec->uid[F], srec->uid[N]), "TUID lost" ); + // Note: status remains S_PENDING. + srec->tuid[0] = 0; + num_lost++; + continue; + mfound: + tmsg->srec = srec; + srec->msg[t] = tmsg; + ntmsg = tmsg->next; + ASSIGN_UID( srec, t, tmsg->uid, "TUID matched %s", diag ); + } + } + return num_lost; +} + +sync_rec_t * +upgrade_srec( sync_vars_t *svars, sync_rec_t *srec ) +{ + // Create an entry and append it to the current one. + sync_rec_t *nsrec = nfzalloc( sizeof(*nsrec) ); + nsrec->next = srec->next; + srec->next = nsrec; + if (svars->srecadd == &srec->next) + svars->srecadd = &nsrec->next; + // Move the placeholder to the new entry. + int t = (srec->status & S_DUMMY(F)) ? F : N; + nsrec->uid[t] = srec->uid[t]; + srec->uid[t] = 0; + if (srec->msg[t]) { // NULL during journal replay; is assigned later. + nsrec->msg[t] = srec->msg[t]; + nsrec->msg[t]->srec = nsrec; + srec->msg[t] = NULL; + } + // Mark the original entry for upgrade. + srec->status = (srec->status & ~(S_DUMMY(F)|S_DUMMY(N))) | S_PENDING; + srec->wstate |= W_UPGRADE; + // Mark the placeholder for nuking. + nsrec->wstate = W_PURGE; + nsrec->aflags[t] = F_DELETED; + return nsrec; +}