Browse Source

do away with newmaxuid

now that expiration order is determined by a single loop ordered by
far-side UIDs, it is no longer necessary to accurately track the highest
seen UID.

as a side effect, this fixes a problem reported (way too long ago) by
Yuri D'Elia: we failed to up newmaxuid for messages we produced
ourselves, so we would keep enumerating the same messages until we also
propagated externally generated messages from that mailbox - which might
have been never for the server side of archive/trash mailboxes.
1.4
Oswald Buddenhagen 5 years ago
parent
commit
e9efc49b6c
  1. 18
      src/run-tests.pl
  2. 62
      src/sync.c

18
src/run-tests.pl

@ -61,7 +61,7 @@ my @X01 = (
A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "FT", G, 7, "FT", I, 9, "", J, 10, "" ], A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "FT", G, 7, "FT", I, 9, "", J, 10, "" ],
[ 10, [ 10,
A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", G, 7, "FT", H, 8, "T", J, 9, "", I, 10, "" ], A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", G, 7, "FT", H, 8, "T", J, 9, "", I, 10, "" ],
[ 9, 0, 9, [ 10, 0, 10,
1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 5, 5, "T", 6, 0, "", 7, 7, "FT", 0, 8, "", 10, 9, "", 9, 10, "" ], 1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 5, 5, "T", 6, 0, "", 7, 7, "FT", 0, 8, "", 10, 9, "", 9, 10, "" ],
); );
test("full", \@x01, \@X01, @O01); test("full", \@x01, \@X01, @O01);
@ -73,7 +73,7 @@ my @X02 = (
A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", I, 9, "", J, 10, "" ], A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", I, 9, "", J, 10, "" ],
[ 10, [ 10,
A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", J, 9, "", I, 10, "" ], A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", J, 9, "", I, 10, "" ],
[ 9, 0, 9, [ 10, 0, 10,
1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 10, 9, "", 9, 10, "" ], 1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 10, 9, "", 9, 10, "" ],
); );
test("full + expunge both", \@x01, \@X02, @O02); test("full + expunge both", \@x01, \@X02, @O02);
@ -85,7 +85,7 @@ my @X03 = (
A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "FT", G, 7, "FT", I, 9, "", J, 10, "" ], A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "FT", G, 7, "FT", I, 9, "", J, 10, "" ],
[ 10, [ 10,
A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", J, 9, "", I, 10, "" ], A, 1, "F", B, 2, "F", C, 3, "FS", D, 4, "", J, 9, "", I, 10, "" ],
[ 9, 0, 9, [ 10, 0, 10,
1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 5, 0, "T", 6, 0, "", 7, 0, "T", 10, 9, "", 9, 10, "" ], 1, 1, "F", 2, 2, "F", 3, 3, "FS", 4, 4, "", 5, 0, "T", 6, 0, "", 7, 0, "T", 10, 9, "", 9, 10, "" ],
); );
test("full + expunge near side", \@x01, \@X03, @O03); test("full + expunge near side", \@x01, \@X03, @O03);
@ -133,7 +133,7 @@ my @X07 = (
A, 1, "F", B, 2, "", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "F", G, 7, "FT", I, 9, "", J, 10, "" ], A, 1, "F", B, 2, "", C, 3, "FS", D, 4, "", E, 5, "T", F, 6, "F", G, 7, "FT", I, 9, "", J, 10, "" ],
[ 10, [ 10,
A, 1, "", B, 2, "F", C, 3, "F", D, 4, "", E, 5, "", G, 7, "", H, 8, "", J, 9, "", I, 10, "" ], A, 1, "", B, 2, "F", C, 3, "F", D, 4, "", E, 5, "", G, 7, "", H, 8, "", J, 9, "", I, 10, "" ],
[ 9, 0, 9, [ 10, 0, 10,
1, 1, "", 2, 2, "", 3, 3, "", 4, 4, "", 5, 5, "", 6, 6, "", 7, 7, "", 8, 8, "", 10, 9, "", 9, 10, "" ], 1, 1, "", 2, 2, "", 3, 3, "", 4, 4, "", 5, 5, "", 6, 6, "", 7, 7, "", 8, 8, "", 10, 9, "", 9, 10, "" ],
); );
test("new", \@x01, \@X07, @O07); test("new", \@x01, \@X07, @O07);
@ -168,7 +168,7 @@ my @X11 = (
A, 1, "", B, 2, "*" ], A, 1, "", B, 2, "*" ],
[ 2, [ 2,
C, 1, "*", A, 2, "" ], C, 1, "*", A, 2, "" ],
[ 2, 0, 1, [ 2, 0, 2,
0, 1, "^", 1, 2, "", 2, 0, "^" ], 0, 1, "^", 1, 2, "", 2, 0, "^" ],
); );
test("max size", \@x10, \@X11, @O11); test("max size", \@x10, \@X11, @O11);
@ -180,7 +180,7 @@ my @X22 = (
A, 1, "", B, 2, "*", C, 3, "*" ], A, 1, "", B, 2, "*", C, 3, "*" ],
[ 2, [ 2,
C, 1, "*", A, 2, "" ], C, 1, "*", A, 2, "" ],
[ 2, 0, 1, [ 3, 0, 2,
3, 1, "", 1, 2, "", 2, 0, "^" ], 3, 1, "", 1, 2, "", 2, 0, "^" ],
); );
test("near side max size", \@X11, \@X22, @O22); test("near side max size", \@X11, \@X22, @O22);
@ -203,7 +203,7 @@ my @X31 = (
A, 1, "F", B, 2, "", C, 3, "S", D, 4, "", E, 5, "S", F, 6, "" ], A, 1, "F", B, 2, "", C, 3, "S", D, 4, "", E, 5, "S", F, 6, "" ],
[ 5, [ 5,
A, 1, "F", B, 2, "", D, 3, "", E, 4, "S", F, 5, "" ], A, 1, "F", B, 2, "", D, 3, "", E, 4, "S", F, 5, "" ],
[ 6, 3, 0, [ 6, 3, 5,
1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ], 1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ],
); );
test("max messages", \@x30, \@X31, @O31); test("max messages", \@x30, \@X31, @O31);
@ -215,7 +215,7 @@ my @X32 = (
A, 1, "F", B, 2, "", C, 3, "S", D, 4, "", E, 5, "S", F, 6, "" ], A, 1, "F", B, 2, "", C, 3, "S", D, 4, "", E, 5, "S", F, 6, "" ],
[ 4, [ 4,
A, 1, "F", D, 2, "", E, 3, "S", F, 4, "" ], A, 1, "F", D, 2, "", E, 3, "S", F, 4, "" ],
[ 6, 3, 0, [ 6, 3, 4,
1, 1, "F", 4, 2, "", 5, 3, "S", 6, 4, "" ], 1, 1, "F", 4, 2, "", 5, 3, "S", 6, 4, "" ],
); );
test("max messages vs. unread", \@x30, \@X32, @O32); test("max messages vs. unread", \@x30, \@X32, @O32);
@ -236,7 +236,7 @@ my @X51 = (
A, 1, "S", B, 2, "FS", C, 3, "S", D, 4, "", E, 5, "", F, 6, "" ], A, 1, "S", B, 2, "FS", C, 3, "S", D, 4, "", E, 5, "", F, 6, "" ],
[ 6, [ 6,
B, 2, "FS", D, 4, "", E, 5, "", F, 6, "" ], B, 2, "FS", D, 4, "", E, 5, "", F, 6, "" ],
[ 6, 3, 0, [ 6, 3, 6,
2, 2, "FS", 4, 4, "", 5, 5, "", 6, 6, "" ], 2, 2, "FS", 4, 4, "", 5, 5, "", 6, 6, "" ],
); );
test("max messages + expunge", \@x50, \@X51, @O51); test("max messages + expunge", \@x50, \@X51, @O51);

62
src/sync.c

@ -167,7 +167,6 @@ typedef struct {
uint ref_count, nsrecs, opts[2]; uint ref_count, nsrecs, opts[2];
uint new_pending[2], flags_pending[2], trash_pending[2]; uint new_pending[2], flags_pending[2], trash_pending[2];
uint maxuid[2]; // highest UID that was already propagated uint maxuid[2]; // highest UID that was already propagated
uint newmaxuid[2]; // highest UID that is currently being propagated
uint uidval[2]; // UID validity value uint uidval[2]; // UID validity value
uint newuidval[2]; // UID validity obtained from driver uint newuidval[2]; // UID validity obtained from driver
uint finduid[2]; // TUID lookup makes sense only for UIDs >= this uint finduid[2]; // TUID lookup makes sense only for UIDs >= this
@ -285,6 +284,8 @@ match_tuids( sync_vars_t *svars, int t, message_t *msgs )
srec->msg[t] = tmsg; srec->msg[t] = tmsg;
ntmsg = tmsg->next; ntmsg = tmsg->next;
srec->uid[t] = tmsg->uid; srec->uid[t] = tmsg->uid;
if (tmsg->uid == svars->maxuid[t] + 1)
svars->maxuid[t] = tmsg->uid;
srec->status = 0; srec->status = 0;
srec->tuid[0] = 0; srec->tuid[0] = 0;
} }
@ -613,7 +614,7 @@ clean_strdup( const char *s )
} }
#define JOURNAL_VERSION "3" #define JOURNAL_VERSION "4"
static int static int
prepare_state( sync_vars_t *svars ) prepare_state( sync_vars_t *svars )
@ -853,8 +854,6 @@ load_state( sync_vars_t *svars )
svars->maxxfuid = minwuid - 1; svars->maxxfuid = minwuid - 1;
} }
svars->newmaxuid[F] = svars->maxuid[F];
svars->newmaxuid[N] = svars->maxuid[N];
int line = 0; int line = 0;
if ((jfp = fopen( svars->jname, "r" ))) { if ((jfp = fopen( svars->jname, "r" ))) {
if (!lock_state( svars )) if (!lock_state( svars ))
@ -884,17 +883,17 @@ load_state( sync_vars_t *svars )
uint t1, t2, t3; uint t1, t2, t3;
if ((c = buf[0]) == '#' ? if ((c = buf[0]) == '#' ?
(tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) : (tn = 0, (sscanf( buf + 2, "%u %u %n", &t1, &t2, &tn ) < 2) || !tn || (ll - (uint)tn != TUIDL + 2)) :
c == 'S' || c == '!' ? c == '!' ?
(sscanf( buf + 2, "%u", &t1 ) != 1) : (sscanf( buf + 2, "%u", &t1 ) != 1) :
c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '|' ? c == 'N' || c == 'F' || c == 'T' || c == '+' || c == '&' || c == '-' || c == '=' || c == '|' ?
(sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) : (sscanf( buf + 2, "%u %u", &t1, &t2 ) != 2) :
(sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3)) (sscanf( buf + 2, "%u %u %u", &t1, &t2, &t3 ) != 3))
{ {
error( "Error: malformed journal entry at %s:%d\n", svars->jname, line ); error( "Error: malformed journal entry at %s:%d\n", svars->jname, line );
goto jbail; goto jbail;
} }
if (c == 'S') if (c == 'N')
svars->maxuid[t1] = svars->newmaxuid[t1]; svars->maxuid[t1] = t2;
else if (c == 'F') else if (c == 'F')
svars->finduid[t1] = t2; svars->finduid[t1] = t2;
else if (c == 'T') else if (c == 'T')
@ -908,10 +907,6 @@ load_state( sync_vars_t *svars )
srec = nfcalloc( sizeof(*srec) ); srec = nfcalloc( sizeof(*srec) );
srec->uid[F] = t1; srec->uid[F] = t1;
srec->uid[N] = t2; srec->uid[N] = t2;
if (svars->newmaxuid[F] < t1)
svars->newmaxuid[F] = t1;
if (svars->newmaxuid[N] < t2)
svars->newmaxuid[N] = t2;
debug( " new entry(%u,%u)\n", t1, t2 ); debug( " new entry(%u,%u)\n", t1, t2 );
srec->status = S_PENDING; srec->status = S_PENDING;
*svars->srecadd = srec; *svars->srecadd = srec;
@ -935,6 +930,7 @@ load_state( sync_vars_t *svars )
break; break;
case '=': case '=':
debug( "aborted\n" ); debug( "aborted\n" );
if (svars->maxxfuid < srec->uid[F])
svars->maxxfuid = srec->uid[F]; svars->maxxfuid = srec->uid[F];
srec->status = S_DEAD; srec->status = S_DEAD;
break; break;
@ -949,12 +945,16 @@ load_state( sync_vars_t *svars )
case '<': case '<':
debug( "far side now %u\n", t3 ); debug( "far side now %u\n", t3 );
srec->uid[F] = t3; srec->uid[F] = t3;
if (t3 == svars->maxuid[F] + 1)
svars->maxuid[F] = t3;
srec->status &= ~S_PENDING; srec->status &= ~S_PENDING;
srec->tuid[0] = 0; srec->tuid[0] = 0;
break; break;
case '>': case '>':
debug( "near side now %u\n", t3 ); debug( "near side now %u\n", t3 );
srec->uid[N] = t3; srec->uid[N] = t3;
if (t3 == svars->maxuid[N] + 1)
svars->maxuid[N] = t3;
srec->status &= ~S_PENDING; srec->status &= ~S_PENDING;
srec->tuid[0] = 0; srec->tuid[0] = 0;
break; break;
@ -1575,22 +1575,26 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
for (t = 0; t < 2; t++) { for (t = 0; t < 2; t++) {
debug( "synchronizing new messages on %s\n", str_fn[1-t] ); debug( "synchronizing new messages on %s\n", str_fn[1-t] );
for (tmsg = svars->msgs[1-t]; tmsg; tmsg = tmsg->next) { for (tmsg = svars->msgs[1-t]; tmsg; tmsg = tmsg->next) {
// If messages were previously ignored due to being excessive, they would now
// appear to be newer than the messages that got actually synced, so increment
// newmaxuid immediately to make sure we always look only at the newest ones.
// However, committing it to maxuid must be delayed until all messages were
// propagated, to ensure that all pending messages are still loaded next time
// in case of interruption - in particular skipping big messages would otherwise
// up the limit too early.
srec = tmsg->srec; srec = tmsg->srec;
if (srec) { if (srec) {
if (srec->status & S_SKIPPED) { if (srec->status & S_SKIPPED) {
// The message was skipped due to being too big. // The message was skipped due to being too big.
// We must have already seen the UID, but we might have been interrupted.
if (svars->maxuid[1-t] < tmsg->uid)
svars->maxuid[1-t] = tmsg->uid;
if (!(svars->chan->ops[t] & OP_RENEW)) if (!(svars->chan->ops[t] & OP_RENEW))
continue; continue;
} else { } else {
if (!(svars->chan->ops[t] & OP_NEW)) if (!(svars->chan->ops[t] & OP_NEW))
continue; continue;
// This catches messages:
// - that are actually new
// - whose propagation got interrupted
// - whose propagation was completed, but not logged yet
// - that aren't actually new, but a result of syncing, and the instant
// maxuid upping was prevented by the presence of actually new messages
if (svars->maxuid[1-t] < tmsg->uid)
svars->maxuid[1-t] = tmsg->uid;
if (!(srec->status & S_PENDING)) if (!(srec->status & S_PENDING))
continue; // Nothing to do - the message is paired or expired continue; // Nothing to do - the message is paired or expired
// Propagation was scheduled, but we got interrupted // Propagation was scheduled, but we got interrupted
@ -1606,13 +1610,14 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
} else { } else {
if (!(svars->chan->ops[t] & OP_NEW)) if (!(svars->chan->ops[t] & OP_NEW))
continue; continue;
if (tmsg->uid <= svars->newmaxuid[1-t]) { if (tmsg->uid <= svars->maxuid[1-t]) {
// The message should be already paired. It's not, so it was: // The message should be already paired. It's not, so it was:
// - previously paired, but the entry was expired and pruned => ignore // - previously paired, but the entry was expired and pruned => ignore
// - attempted, but failed => ignore (the wisdom of this is debatable) // - attempted, but failed => ignore (the wisdom of this is debatable)
// - ignored, as it would have been expunged anyway => ignore (even if undeleted) // - ignored, as it would have been expunged anyway => ignore (even if undeleted)
continue; continue;
} }
svars->maxuid[1-t] = tmsg->uid;
debug( "new message %u\n", tmsg->uid ); debug( "new message %u\n", tmsg->uid );
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) { if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) {
@ -1628,8 +1633,6 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
srec->uid[1-t] = tmsg->uid; srec->uid[1-t] = tmsg->uid;
srec->msg[1-t] = tmsg; srec->msg[1-t] = tmsg;
tmsg->srec = srec; tmsg->srec = srec;
if (svars->newmaxuid[1-t] < tmsg->uid)
svars->newmaxuid[1-t] = tmsg->uid;
JLOG( "+ %u %u", (srec->uid[F], srec->uid[N]), "fresh" ); JLOG( "+ %u %u", (srec->uid[F], srec->uid[N]), "fresh" );
} }
if ((tmsg->flags & F_FLAGGED) || tmsg->size <= svars->chan->stores[t]->max_size) { if ((tmsg->flags & F_FLAGGED) || tmsg->size <= svars->chan->stores[t]->max_size) {
@ -1748,6 +1751,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
// If we have so many new messages that some of them are instantly expired, // If we have so many new messages that some of them are instantly expired,
// but some are still propagated because they are important, we need to // but some are still propagated because they are important, we need to
// ensure explicitly that the bulk fetch limit is upped. // ensure explicitly that the bulk fetch limit is upped.
if (svars->maxxfuid < srec->uid[F])
svars->maxxfuid = srec->uid[F]; svars->maxxfuid = srec->uid[F];
srec->msg[F]->srec = NULL; srec->msg[F]->srec = NULL;
srec->status = S_DEAD; srec->status = S_DEAD;
@ -1849,6 +1853,8 @@ msg_copied( int sts, uint uid, copy_vars_t *vars )
} else { } else {
JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], uid), "%sed message", str_hl[t] ); JLOG( "%c %u %u %u", ("<>"[t], srec->uid[F], srec->uid[N], uid), "%sed message", str_hl[t] );
vars->srec->uid[t] = uid; vars->srec->uid[t] = uid;
if (uid == svars->maxuid[t] + 1)
svars->maxuid[t] = uid;
vars->srec->status &= ~S_PENDING; vars->srec->status &= ~S_PENDING;
vars->srec->tuid[0] = 0; vars->srec->tuid[0] = 0;
} }
@ -1918,10 +1924,6 @@ msgs_copied( sync_vars_t *svars, int t )
if (svars->new_pending[t]) if (svars->new_pending[t])
goto out; goto out;
if (svars->maxuid[1-t] != svars->newmaxuid[1-t]) {
svars->maxuid[1-t] = svars->newmaxuid[1-t];
JLOG( "S %d", 1-t, "commit maxuid of %s", str_fn[1-t] );
}
sync_close( svars, 1-t ); sync_close( svars, 1-t );
if (check_cancel( svars )) if (check_cancel( svars ))
goto out; goto out;
@ -2159,6 +2161,14 @@ box_closed_p2( sync_vars_t *svars, int t )
// the operations are idempotent, and we're about to commit the new state // the operations are idempotent, and we're about to commit the new state
// right afterwards anyway. // right afterwards anyway.
for (t = 0; t < 2; t++) {
// Committing maxuid is delayed until all messages were propagated, to
// ensure that all pending messages are still loaded next time in case
// of interruption - in particular skipping big messages would otherwise
// up the limit too early.
JLOG( "N %d %u", (t, svars->maxuid[t]), "up maxuid of %s", str_fn[t] );
}
if (((svars->state[F] | svars->state[N]) & ST_DID_EXPUNGE) || svars->chan->max_messages) { if (((svars->state[F] | svars->state[N]) & ST_DID_EXPUNGE) || svars->chan->max_messages) {
debug( "purging obsolete entries\n" ); debug( "purging obsolete entries\n" );
for (srec = svars->srecs; srec; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {

Loading…
Cancel
Save