Browse Source

rewrite tracking of highest expired UID

so far, we tracked the slave side, and calculated the master side on the
fly. that complicated things unnecessarily.
wip/movedetect
Oswald Buddenhagen 8 years ago
parent
commit
36666f7e52
  1. 14
      src/run-tests.pl
  2. 125
      src/sync.c

14
src/run-tests.pl

@ -195,7 +195,7 @@ my @X31 = (
1, 1, "F", 2, 2, "", 3, 3, "S", 4, 4, "", 5, 5, "S", 6, 6, "" ], 1, 1, "F", 2, 2, "", 3, 3, "S", 4, 4, "", 5, 5, "S", 6, 6, "" ],
[ 5, [ 5,
1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ], 1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ],
[ 6, 2, 0, [ 6, 3, 0,
1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ], 1, 1, "F", 2, 2, "", 4, 3, "", 5, 4, "S", 6, 5, "" ],
); );
test("max messages", \@x30, \@X31, @O31); test("max messages", \@x30, \@X31, @O31);
@ -207,7 +207,7 @@ my @X32 = (
1, 1, "F", 2, 2, "", 3, 3, "S", 4, 4, "", 5, 5, "S", 6, 6, "" ], 1, 1, "F", 2, 2, "", 3, 3, "S", 4, 4, "", 5, 5, "S", 6, 6, "" ],
[ 4, [ 4,
1, 1, "F", 4, 2, "", 5, 3, "S", 6, 4, "" ], 1, 1, "F", 4, 2, "", 5, 3, "S", 6, 4, "" ],
[ 6, 1, 0, [ 6, 3, 0,
1, 1, "F", 4, 2, "", 5, 3, "S", 6, 4, "" ], 1, 1, "F", 4, 2, "", 5, 3, "S", 6, 4, "" ],
); );
test("max messages vs. unread", \@x30, \@X32, @O32); test("max messages vs. unread", \@x30, \@X32, @O32);
@ -367,7 +367,7 @@ sub showbox($)
# $filename # $filename
# Output: # Output:
# [ maxuid[M], smaxxuid, maxuid[S], # [ maxuid[M], mmaxxuid, maxuid[S],
# uid[M], uid[S], "flags", ... ], # uid[M], uid[S], "flags", ... ],
sub showstate($) sub showstate($)
{ {
@ -396,7 +396,7 @@ sub showstate($)
return; return;
} }
my @T = ($hdr{'MaxPulledUid'} // "missing", my @T = ($hdr{'MaxPulledUid'} // "missing",
$hdr{'MaxExpiredSlaveUid'} // "0", $hdr{'MaxExpiredMasterUid'} // "0",
$hdr{'MaxPushedUid'} // "missing"); $hdr{'MaxPushedUid'} // "missing");
for (@ls) { for (@ls) {
/^(-?\d+) (-?\d+) (.*)$/; /^(-?\d+) (-?\d+) (.*)$/;
@ -472,7 +472,7 @@ sub mkchan($$@)
open(FILE, ">", "slave/.mbsyncstate") or open(FILE, ">", "slave/.mbsyncstate") or
die "Cannot create sync state.\n"; die "Cannot create sync state.\n";
print FILE "MasterUidValidity 1\nMaxPulledUid ".shift(@t)."\n". print FILE "MasterUidValidity 1\nMaxPulledUid ".shift(@t)."\n".
"SlaveUidValidity 1\nMaxExpiredSlaveUid ".shift(@t)."\nMaxPushedUid ".shift(@t)."\n\n"; "SlaveUidValidity 1\nMaxExpiredMasterUid ".shift(@t)."\nMaxPushedUid ".shift(@t)."\n\n";
while (@t) { while (@t) {
print FILE shift(@t)." ".shift(@t)." ".shift(@t)."\n"; print FILE shift(@t)." ".shift(@t)." ".shift(@t)."\n";
} }
@ -515,13 +515,13 @@ sub ckbox($$$@)
# $filename, @syncstate # $filename, @syncstate
sub ckstate($@) sub ckstate($@)
{ {
my ($fn, $mmaxuid, $smaxxuid, $smaxuid, @T) = @_; my ($fn, $mmaxuid, $mmaxxuid, $smaxuid, @T) = @_;
my %hdr; my %hdr;
$hdr{'MasterUidValidity'} = "1"; $hdr{'MasterUidValidity'} = "1";
$hdr{'SlaveUidValidity'} = "1"; $hdr{'SlaveUidValidity'} = "1";
$hdr{'MaxPulledUid'} = $mmaxuid; $hdr{'MaxPulledUid'} = $mmaxuid;
$hdr{'MaxPushedUid'} = $smaxuid; $hdr{'MaxPushedUid'} = $smaxuid;
$hdr{'MaxExpiredSlaveUid'} = $smaxxuid if ($smaxxuid ne 0); $hdr{'MaxExpiredMasterUid'} = $mmaxxuid if ($mmaxxuid ne 0);
open(FILE, "<", $fn) or die "Cannot read sync state $fn.\n"; open(FILE, "<", $fn) or die "Cannot read sync state $fn.\n";
chomp(my @ls = <FILE>); chomp(my @ls = <FILE>);
close FILE; close FILE;

125
src/sync.c

@ -159,8 +159,7 @@ typedef struct {
int newmaxuid[2]; /* highest UID that is currently being propagated */ int newmaxuid[2]; /* highest UID that is currently being propagated */
int uidval[2]; /* UID validity value */ int uidval[2]; /* UID validity value */
int newuid[2]; /* TUID lookup makes sense only for UIDs >= this */ int newuid[2]; /* TUID lookup makes sense only for UIDs >= this */
int mmaxxuid; /* highest expired UID on master during new message propagation */ int mmaxxuid; /* highest expired UID on master */
int smaxxuid; /* highest expired UID on slave */
} sync_vars_t; } sync_vars_t;
static void sync_ref( sync_vars_t *svars ) { ++svars->ref_count; } static void sync_ref( sync_vars_t *svars ) { ++svars->ref_count; }
@ -649,8 +648,8 @@ save_state( sync_vars_t *svars )
Fprintf( svars->nfp, Fprintf( svars->nfp,
"MasterUidValidity %d\nSlaveUidValidity %d\nMaxPulledUid %d\nMaxPushedUid %d\n", "MasterUidValidity %d\nSlaveUidValidity %d\nMaxPulledUid %d\nMaxPushedUid %d\n",
svars->uidval[M], svars->uidval[S], svars->maxuid[M], svars->maxuid[S] ); svars->uidval[M], svars->uidval[S], svars->maxuid[M], svars->maxuid[S] );
if (svars->smaxxuid) if (svars->mmaxxuid)
Fprintf( svars->nfp, "MaxExpiredSlaveUid %d\n", svars->smaxxuid ); Fprintf( svars->nfp, "MaxExpiredMasterUid %d\n", svars->mmaxxuid );
Fprintf( svars->nfp, "\n" ); Fprintf( svars->nfp, "\n" );
for (srec = svars->srecs; srec; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
@ -678,6 +677,7 @@ load_state( sync_vars_t *svars )
char *s; char *s;
FILE *jfp; FILE *jfp;
int line, t, t1, t2, t3; int line, t, t1, t2, t3;
int smaxxuid = 0;
char c; char c;
struct stat st; struct stat st;
char fbuf[16]; /* enlarge when support for keywords is added */ char fbuf[16]; /* enlarge when support for keywords is added */
@ -701,7 +701,7 @@ load_state( sync_vars_t *svars )
if (line == 1 && isdigit( buf[0] )) { if (line == 1 && isdigit( buf[0] )) {
if (sscanf( buf, "%63s %63s", buf1, buf2 ) != 2 || if (sscanf( buf, "%63s %63s", buf1, buf2 ) != 2 ||
sscanf( buf1, "%d:%d", &svars->uidval[M], &svars->maxuid[M] ) < 2 || sscanf( buf1, "%d:%d", &svars->uidval[M], &svars->maxuid[M] ) < 2 ||
sscanf( buf2, "%d:%d:%d", &svars->uidval[S], &svars->smaxxuid, &svars->maxuid[S] ) < 3) { sscanf( buf2, "%d:%d:%d", &svars->uidval[S], &smaxxuid, &svars->maxuid[S] ) < 3) {
error( "Error: invalid sync state header in %s\n", svars->dname ); error( "Error: invalid sync state header in %s\n", svars->dname );
goto jbail; goto jbail;
} }
@ -719,8 +719,10 @@ load_state( sync_vars_t *svars )
svars->maxuid[M] = t1; svars->maxuid[M] = t1;
else if (!strcmp( buf1, "MaxPushedUid" )) else if (!strcmp( buf1, "MaxPushedUid" ))
svars->maxuid[S] = t1; svars->maxuid[S] = t1;
else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) else if (!strcmp( buf1, "MaxExpiredMasterUid" ))
svars->smaxxuid = t1; svars->mmaxxuid = t1;
else if (!strcmp( buf1, "MaxExpiredSlaveUid" )) // Legacy
smaxxuid = t1;
else { else {
error( "Error: unrecognized sync state header entry at %s:%d\n", svars->dname, line ); error( "Error: unrecognized sync state header entry at %s:%d\n", svars->dname, line );
goto jbail; goto jbail;
@ -768,9 +770,36 @@ load_state( sync_vars_t *svars )
} }
svars->existing = 0; svars->existing = 0;
} }
// This is legacy support for pre-1.3 sync states.
if (smaxxuid) {
int minwuid = INT_MAX;
for (srec = svars->srecs; srec; srec = srec->next) {
if ((srec->status & S_DEAD) || srec->uid[M] <= 0)
continue;
if (srec->status & S_EXPIRED) {
if (!srec->uid[S]) {
// The expired message was already gone.
continue;
}
// The expired message was not expunged yet, so re-examine it.
// This will happen en masse, so just extend the bulk fetch.
} else {
if (srec->uid[S] > 0 && smaxxuid >= srec->uid[S]) {
// The non-expired message is in the generally expired range,
// so don't make it contribute to the bulk fetch.
continue;
}
// Usual non-expired message.
}
if (minwuid > srec->uid[M])
minwuid = srec->uid[M];
}
svars->mmaxxuid = minwuid - 1;
}
svars->newmaxuid[M] = svars->maxuid[M]; svars->newmaxuid[M] = svars->maxuid[M];
svars->newmaxuid[S] = svars->maxuid[S]; svars->newmaxuid[S] = svars->maxuid[S];
svars->mmaxxuid = INT_MAX;
line = 0; line = 0;
if ((jfp = fopen( svars->jname, "r" ))) { if ((jfp = fopen( svars->jname, "r" ))) {
if (!lock_state( svars )) if (!lock_state( svars ))
@ -814,7 +843,7 @@ load_state( sync_vars_t *svars )
else if (c == 'T') else if (c == 'T')
*int_array_append( &svars->trashed_msgs[t1] ) = t2; *int_array_append( &svars->trashed_msgs[t1] ) = t2;
else if (c == '!') else if (c == '!')
svars->smaxxuid = t1; svars->mmaxxuid = t1;
else if (c == '|') { else if (c == '|') {
svars->uidval[M] = t1; svars->uidval[M] = t1;
svars->uidval[S] = t2; svars->uidval[S] = t2;
@ -897,11 +926,9 @@ load_state( sync_vars_t *svars )
case '/': case '/':
t3 = (srec->status & S_EXPIRE); t3 = (srec->status & S_EXPIRE);
debug( "expired now %d\n", t3 / S_EXPIRE ); debug( "expired now %d\n", t3 / S_EXPIRE );
if (t3) { if (t3)
if (svars->smaxxuid < srec->uid[S])
svars->smaxxuid = srec->uid[S];
srec->status |= S_EXPIRED; srec->status |= S_EXPIRED;
} else else
srec->status &= ~S_EXPIRED; srec->status &= ~S_EXPIRED;
break; break;
default: default:
@ -919,6 +946,7 @@ load_state( sync_vars_t *svars )
} }
} }
svars->replayed = line; svars->replayed = line;
return 1; return 1;
} }
@ -1238,31 +1266,9 @@ box_opened2( sync_vars_t *svars, int t )
/* When messages have been expired on the slave, the master fetch is split into /* When messages have been expired on the slave, the master fetch is split into
* two ranges: The bulk fetch which corresponds with the most recent messages, and an * two ranges: The bulk fetch which corresponds with the most recent messages, and an
* exception list of messages which would have been expired if they weren't important. */ * exception list of messages which would have been expired if they weren't important. */
debug( "preparing master selection - max expired slave uid is %d\n", svars->smaxxuid ); debug( "preparing master selection - max expired master uid is %d\n", svars->mmaxxuid );
/* First, find out the lower bound for the bulk fetch. */ /* First, find out the lower bound for the bulk fetch. */
minwuid = INT_MAX; minwuid = svars->mmaxxuid + 1;
for (srec = svars->srecs; srec; srec = srec->next) {
if ((srec->status & S_DEAD) || srec->uid[M] <= 0)
continue;
if (srec->status & S_EXPIRED) {
if (!srec->uid[S]) {
/* The expired message was already gone. */
continue;
}
/* The expired message was not expunged yet, so re-examine it.
* This will happen en masse, so just extend the bulk fetch. */
} else {
if (svars->smaxxuid >= srec->uid[S]) {
/* The non-expired message is in the generally expired range, so don't
* make it contribute to the bulk fetch. */
continue;
}
/* Usual non-expired message. */
}
if (minwuid > srec->uid[M])
minwuid = srec->uid[M];
}
debug( " min non-orphaned master uid is %d\n", minwuid );
/* Next, calculate the exception fetch. */ /* Next, calculate the exception fetch. */
for (srec = svars->srecs; srec; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
@ -1665,6 +1671,8 @@ box_loaded( int sts, void *aux )
/* The message is excess or was already (being) expired. */ /* The message is excess or was already (being) expired. */
srec->status |= S_NEXPIRE; srec->status |= S_NEXPIRE;
debug( " old pair(%d,%d) expired\n", srec->uid[M], srec->uid[S] ); debug( " old pair(%d,%d) expired\n", srec->uid[M], srec->uid[S] );
if (svars->mmaxxuid < srec->uid[M])
svars->mmaxxuid = srec->uid[M];
todel--; todel--;
} }
} }
@ -1721,6 +1729,9 @@ box_loaded( int sts, void *aux )
if (srec->status & S_NEXPIRE) { if (srec->status & S_NEXPIRE) {
jFprintf( svars, "= %d %d\n", srec->uid[M], srec->uid[S] ); jFprintf( svars, "= %d %d\n", srec->uid[M], srec->uid[S] );
debug( " pair(%d,%d): 1 (abort)\n", srec->uid[M], srec->uid[S] ); debug( " pair(%d,%d): 1 (abort)\n", srec->uid[M], srec->uid[S] );
// If we have so many new messages that some of them are instantly expired,
// but some are still propagated because they are important, we need to
// ensure explicitly that the bulk fetch limit is upped.
svars->mmaxxuid = srec->uid[M]; svars->mmaxxuid = srec->uid[M];
srec->msg[M]->srec = 0; srec->msg[M]->srec = 0;
srec->status = S_DEAD; srec->status = S_DEAD;
@ -1849,16 +1860,6 @@ msg_copied_p2( sync_vars_t *svars, sync_rec_t *srec, int t, int uid )
srec->uid[t] = uid; srec->uid[t] = uid;
srec->tuid[0] = 0; srec->tuid[0] = 0;
} }
if (t == S && svars->mmaxxuid < srec->uid[M]) {
/* If we have so many new messages that some of them are instantly expired,
* but some are still propagated because they are important, we need to
* ensure explicitly that the bulk fetch limit is upped. */
svars->mmaxxuid = INT_MAX;
if (svars->smaxxuid < srec->uid[S] - 1) {
svars->smaxxuid = srec->uid[S] - 1;
jFprintf( svars, "! %d\n", svars->smaxxuid );
}
}
} }
static void msgs_found_new( int sts, void *aux ); static void msgs_found_new( int sts, void *aux );
@ -1984,8 +1985,6 @@ flags_set_p2( sync_vars_t *svars, sync_rec_t *srec, int t )
if (t == S) { if (t == S) {
uint nex = (srec->status / S_NEXPIRE) & 1; uint nex = (srec->status / S_NEXPIRE) & 1;
if (nex != ((srec->status / S_EXPIRED) & 1)) { if (nex != ((srec->status / S_EXPIRED) & 1)) {
if (nex && (svars->smaxxuid < srec->uid[S]))
svars->smaxxuid = srec->uid[S];
jFprintf( svars, "/ %d %d\n", srec->uid[M], srec->uid[S] ); jFprintf( svars, "/ %d %d\n", srec->uid[M], srec->uid[S] );
debug( " pair(%d,%d): expired %d (commit)\n", srec->uid[M], srec->uid[S], nex ); debug( " pair(%d,%d): expired %d (commit)\n", srec->uid[M], srec->uid[S], nex );
srec->status = (srec->status & ~S_EXPIRED) | (nex * S_EXPIRED); srec->status = (srec->status & ~S_EXPIRED) | (nex * S_EXPIRED);
@ -2144,35 +2143,23 @@ static void
box_closed_p2( sync_vars_t *svars, int t ) box_closed_p2( sync_vars_t *svars, int t )
{ {
sync_rec_t *srec; sync_rec_t *srec;
int minwuid;
svars->state[t] |= ST_CLOSED; svars->state[t] |= ST_CLOSED;
if (!(svars->state[1-t] & ST_CLOSED)) if (!(svars->state[1-t] & ST_CLOSED))
return; return;
// All the journalling done in this function is merely for the autotest -
// the operations are idempotent, and we're about to commit the new state
// right afterwards anyway.
if (((svars->state[M] | svars->state[S]) & ST_DID_EXPUNGE) || svars->chan->max_messages) { if (((svars->state[M] | svars->state[S]) & ST_DID_EXPUNGE) || svars->chan->max_messages) {
debug( "purging obsolete entries\n" ); debug( "purging obsolete entries\n" );
minwuid = INT_MAX;
if (svars->chan->max_messages) {
debug( " max expired slave uid is %d\n", svars->smaxxuid );
for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD)
continue;
if (!((srec->uid[S] <= 0 || ((srec->status & S_DEL(S)) && (svars->state[S] & ST_DID_EXPUNGE))) &&
(srec->uid[M] <= 0 || ((srec->status & S_DEL(M)) && (svars->state[M] & ST_DID_EXPUNGE)) || (srec->status & S_EXPIRED))) &&
svars->smaxxuid < srec->uid[S] && minwuid > srec->uid[M])
minwuid = srec->uid[M];
}
debug( " min non-orphaned master uid is %d\n", minwuid );
}
for (srec = svars->srecs; srec; srec = srec->next) { for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD) if (srec->status & S_DEAD)
continue; continue;
if (srec->uid[S] <= 0 || ((srec->status & S_DEL(S)) && (svars->state[S] & ST_DID_EXPUNGE))) { if (srec->uid[S] <= 0 || ((srec->status & S_DEL(S)) && (svars->state[S] & ST_DID_EXPUNGE))) {
if (srec->uid[M] <= 0 || ((srec->status & S_DEL(M)) && (svars->state[M] & ST_DID_EXPUNGE)) || if (srec->uid[M] <= 0 || ((srec->status & S_DEL(M)) && (svars->state[M] & ST_DID_EXPUNGE)) ||
((srec->status & S_EXPIRED) && svars->maxuid[M] >= srec->uid[M] && minwuid > srec->uid[M])) { ((srec->status & S_EXPIRED) && svars->maxuid[M] >= srec->uid[M] && svars->mmaxxuid >= srec->uid[M])) {
debug( " -> killing (%d,%d)\n", srec->uid[M], srec->uid[S] ); debug( " -> killing (%d,%d)\n", srec->uid[M], srec->uid[S] );
srec->status = S_DEAD; srec->status = S_DEAD;
jFprintf( svars, "- %d %d\n", srec->uid[M], srec->uid[S] ); jFprintf( svars, "- %d %d\n", srec->uid[M], srec->uid[S] );
@ -2189,6 +2176,12 @@ box_closed_p2( sync_vars_t *svars, int t )
} }
} }
// This is just an optimization, so it needs no journaling of intermediate states.
// However, doing it before the entry purge would require ensuring that the
// exception list includes all relevant messages.
debug( "max expired uid on master is now %d\n", svars->mmaxxuid );
jFprintf( svars, "! %d\n", svars->mmaxxuid );
save_state( svars ); save_state( svars );
sync_bail( svars ); sync_bail( svars );

Loading…
Cancel
Save