Browse Source

move away from magic UIDs in the sync state

the only legitimate "deviant" UID is zero, meaning "no message". this
can be futher qualified by additional flags in the sync record, rather
than using magic values for the UID. in fact, the zero UID (so far
meaning only "expunged") was already optionally qualifed with "expired".

as a side effect, driver->store_msg() now returns 0 instead of -2 for
unknown UIDs. this was a hack to avoid translating the value later
on, but it made the api horrible, and now it's superflous in the first
place.
wip/movedetect
Oswald Buddenhagen 8 years ago
parent
commit
bd5fb6fff3
  1. 2
      src/driver.h
  2. 2
      src/drv_imap.c
  3. 8
      src/run-tests.pl
  4. 132
      src/sync.c

2
src/driver.h

@ -214,7 +214,7 @@ struct driver {
void (*cb)( int sts, void *aux ), void *aux );
/* Store the given message to either the current mailbox or the trash folder.
* If the new copy's UID can be immediately determined, return it, otherwise -2. */
* If the new copy's UID can be immediately determined, return it, otherwise 0. */
void (*store_msg)( store_t *ctx, msg_data_t *data, int to_trash,
void (*cb)( int sts, int uid, void *aux ), void *aux );

2
src/drv_imap.c

@ -2848,7 +2848,7 @@ imap_store_msg( store_t *gctx, msg_data_t *data, int to_trash,
ctx->buffer_mem += data->len;
cmd->gen.param.data_len = data->len;
cmd->gen.param.data = data->data;
cmd->out_uid = -2;
cmd->out_uid = 0;
if (to_trash) {
cmd->gen.param.create = 1;

8
src/run-tests.pl

@ -161,7 +161,7 @@ my @X11 = (
[ 2,
3, 1, "*", 1, 2, "" ],
[ 2, 0, 1,
-1, 1, "", 1, 2, "", 2, -1, "" ],
0, 1, "^", 1, 2, "", 2, 0, "^" ],
);
test("max size", \@x10, \@X11, @O11);
@ -173,7 +173,7 @@ my @X22 = (
[ 2,
3, 1, "*", 1, 2, "" ],
[ 2, 0, 1,
3, 1, "", 1, 2, "", 2, -1, "" ],
3, 1, "", 1, 2, "", 2, 0, "^" ],
);
test("slave max size", \@X11, \@X22, @O22);
@ -218,7 +218,7 @@ my @x50 = (
[ 6,
1, 1, "S", 2, 2, "ST", 4, 4, "", 5, 5, "", 6, 6, "" ],
[ 6, 3, 0,
1, 1, "FS", 2, 2, "XS", 3, 3, "XS", 4, 4, "", 5, 5, "", 6, 6, "" ],
1, 1, "FS", 2, 2, "~S", 3, 3, "~S", 4, 4, "", 5, 5, "", 6, 6, "" ],
);
my @O51 = ("", "", "MaxMessages 3\nExpunge Both\n");
@ -399,7 +399,7 @@ sub showstate($)
$hdr{'MaxExpiredMasterUid'} // "0",
$hdr{'MaxPushedUid'} // "missing");
for (@ls) {
/^(-?\d+) (-?\d+) (.*)$/;
/^(\d+) (\d+) (.*)$/;
push @T, $1, $2, $3;
}
printstate(@T);

132
src/sync.c

@ -129,6 +129,8 @@ make_flags( int flags, char *buf )
// bitfield to allow for easy testing for multiple states.
#define S_EXPIRE (1<<0) // the entry is being expired (slave message removal scheduled)
#define S_EXPIRED (1<<1) // the entry is expired (slave message removal confirmed)
#define S_PENDING (1<<2) // the entry is new and awaits propagation (possibly a retry)
#define S_SKIPPED (1<<3) // the entry was not propagated (message is too big)
#define S_DEAD (1<<7) // ephemeral: the entry was killed and should be ignored
// Ephemeral working set.
@ -139,7 +141,7 @@ make_flags( int flags, char *buf )
typedef struct sync_rec {
struct sync_rec *next;
/* string_list_t *keywords; */
int uid[2]; /* -2 = pending (use tuid), -1 = skipped (too big), 0 = expired */
int uid[2];
message_t *msg[2];
uchar status, wstate, flags, aflags[2], dflags[2];
char tuid[TUIDL];
@ -236,7 +238,7 @@ match_tuids( sync_vars_t *svars, int t, message_t *msgs )
for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD)
continue;
if (srec->uid[t] == -2 && srec->tuid[0]) {
if (!srec->uid[t] && srec->tuid[0]) {
debug( " pair(%d,%d): lookup %s, TUID %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], str_ms[t], srec->tuid );
for (tmsg = ntmsg; tmsg; tmsg = tmsg->next) {
if (tmsg->status & M_DEAD)
@ -257,6 +259,7 @@ match_tuids( sync_vars_t *svars, int t, message_t *msgs )
debug( " -> TUID lost\n" );
jFprintf( svars, "& %d %d\n", srec->uid[M], srec->uid[S] );
srec->flags = 0;
// Note: status remains S_PENDING.
srec->tuid[0] = 0;
num_lost++;
continue;
@ -267,6 +270,7 @@ match_tuids( sync_vars_t *svars, int t, message_t *msgs )
srec->msg[t] = tmsg;
ntmsg = tmsg->next;
srec->uid[t] = tmsg->uid;
srec->status = 0;
srec->tuid[0] = 0;
}
}
@ -666,7 +670,7 @@ save_state( sync_vars_t *svars )
continue;
make_flags( srec->flags, fbuf );
Fprintf( svars->nfp, "%d %d %s%s\n", srec->uid[M], srec->uid[S],
srec->status & S_EXPIRED ? "X" : "", fbuf );
(srec->status & S_SKIPPED) ? "^" : (srec->status & S_PENDING) ? "!" : (srec->status & S_EXPIRED) ? "~" : "", fbuf );
}
Fclose( svars->nfp, 1 );
@ -757,14 +761,33 @@ load_state( sync_vars_t *svars )
srec->uid[M] = t1;
srec->uid[S] = t2;
s = fbuf;
if (*s == 'X') {
if (*s == '^') {
s++;
srec->status = S_SKIPPED;
} else if (*s == '!') {
s++;
srec->status = S_PENDING;
} else if (*s == '~' || *s == 'X' /* Pre-1.3 legacy */) {
s++;
srec->status = S_EXPIRE | S_EXPIRED;
} else if (srec->uid[M] == -1) { // Pre-1.3 legacy
srec->uid[M] = 0;
srec->status = S_SKIPPED;
} else if (srec->uid[M] == -2) {
srec->uid[M] = 0;
srec->status = S_PENDING;
} else if (srec->uid[S] == -1) {
srec->uid[S] = 0;
srec->status = S_SKIPPED;
} else if (srec->uid[S] == -2) {
srec->uid[S] = 0;
srec->status = S_PENDING;
} else
srec->status = 0;
srec->wstate = 0;
srec->flags = parse_flags( s );
debug( " entry (%d,%d,%u,%s)\n", srec->uid[M], srec->uid[S], srec->flags, srec->status & S_EXPIRED ? "X" : "" );
debug( " entry (%d,%d,%u,%s)\n", srec->uid[M], srec->uid[S], srec->flags,
(srec->status & S_SKIPPED) ? "SKIP" : (srec->status & S_PENDING) ? "FAIL" : (srec->status & S_EXPIRED) ? "XPIRE" : "" );
srec->msg[M] = srec->msg[S] = 0;
srec->tuid[0] = 0;
srec->next = 0;
@ -786,7 +809,7 @@ load_state( sync_vars_t *svars )
if (smaxxuid) {
int minwuid = INT_MAX;
for (srec = svars->srecs; srec; srec = srec->next) {
if ((srec->status & S_DEAD) || srec->uid[M] <= 0)
if ((srec->status & (S_DEAD | S_SKIPPED | S_PENDING)) || !srec->uid[M])
continue;
if (srec->status & S_EXPIRED) {
if (!srec->uid[S]) {
@ -796,7 +819,7 @@ load_state( sync_vars_t *svars )
// The expired message was not expunged yet, so re-examine it.
// This will happen en masse, so just extend the bulk fetch.
} else {
if (srec->uid[S] > 0 && smaxxuid >= srec->uid[S]) {
if (srec->uid[S] && smaxxuid >= srec->uid[S]) {
// The non-expired message is in the generally expired range,
// so don't make it contribute to the bulk fetch.
continue;
@ -868,7 +891,7 @@ load_state( sync_vars_t *svars )
svars->newmaxuid[S] = t2;
debug( " new entry(%d,%d)\n", t1, t2 );
srec->msg[M] = srec->msg[S] = 0;
srec->status = 0;
srec->status = S_PENDING;
srec->wstate = 0;
srec->flags = 0;
srec->tuid[0] = 0;
@ -909,11 +932,13 @@ load_state( sync_vars_t *svars )
case '<':
debug( "master now %d\n", t3 );
srec->uid[M] = t3;
srec->status &= ~S_PENDING;
srec->tuid[0] = 0;
break;
case '>':
debug( "slave now %d\n", t3 );
srec->uid[S] = t3;
srec->status &= ~S_PENDING;
srec->tuid[0] = 0;
break;
case '*':
@ -1244,9 +1269,9 @@ box_opened2( sync_vars_t *svars, int t )
if (srec->status & S_DEAD)
continue;
if (srec->tuid[0]) {
if (srec->uid[M] == -2)
if (!srec->uid[M])
opts[M] |= OPEN_NEW|OPEN_FIND, svars->state[M] |= ST_FIND_OLD;
else if (srec->uid[S] == -2)
else if (!srec->uid[S])
opts[S] |= OPEN_NEW|OPEN_FIND, svars->state[S] |= ST_FIND_OLD;
else
assert( !"sync record with stray TUID" );
@ -1268,8 +1293,11 @@ box_opened2( sync_vars_t *svars, int t )
for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD)
continue;
if (srec->uid[M] > 0 && srec->uid[S] > 0 && minwuid > srec->uid[M] &&
(!(svars->opts[M] & OPEN_NEW) || svars->maxuid[M] >= srec->uid[M])) {
if (!srec->uid[M]) // No message; other state is irrelevant
continue;
if (minwuid > srec->uid[M] && (!(svars->opts[M] & OPEN_NEW) || svars->maxuid[M] >= srec->uid[M])) {
if (!srec->uid[S] && !(srec->status & S_PENDING)) // Only actually paired up messages matter
continue;
/* The pair is alive, but outside the bulk range. */
*int_array_append( &mexcs ) = srec->uid[M];
}
@ -1341,7 +1369,6 @@ static void flags_set( int sts, void *aux );
static void flags_set_p2( sync_vars_t *svars, sync_rec_t *srec, int t );
static void msgs_flags_set( sync_vars_t *svars, int t );
static void msg_copied( int sts, int uid, copy_vars_t *vars );
static void msg_copied_p2( sync_vars_t *svars, sync_rec_t *srec, int t, int uid );
static void msgs_copied( sync_vars_t *svars, int t );
static void
@ -1375,7 +1402,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
if (srec->status & S_DEAD)
continue;
uid = srec->uid[t];
if (uid <= 0)
if (!uid)
continue;
idx = (uint)((uint)uid * 1103515245U) % hashsz;
while (srecmap[idx].uid)
@ -1473,8 +1500,8 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
jFprintf( svars, "- %d %d\n", srec->uid[M], srec->uid[S] );
} else {
// del[] means that a message becomes known to have been expunged.
del[M] = no[M] && (srec->uid[M] > 0);
del[S] = no[S] && (srec->uid[S] > 0);
del[M] = no[M] && srec->uid[M];
del[S] = no[S] && srec->uid[S];
for (t = 0; t < 2; t++) {
srec->aflags[t] = srec->dflags[t] = 0;
@ -1483,7 +1510,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
if (del[t]) {
// The target was newly expunged, so there is nothing to update.
// The deletion is propagated in the opposite iteration.
} else if (srec->uid[t] <= 0) {
} else if (!srec->uid[t]) {
// The target was never stored, or was previously expunged, so there
// is nothing to update.
// Note: the opposite UID must be valid, as otherwise the entry would
@ -1538,9 +1565,9 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
for (t = 0; t < 2; t++) {
for (tmsg = svars->msgs[1-t]; tmsg; tmsg = tmsg->next) {
// If new have no srec, the message is always New. If we have a srec:
// - message is old (> 0) or expired (0) => ignore
// - message was skipped (-1) => ReNew
// - message was attempted, but failed (-2) => New
// - message is paired or expired => ignore
// - message was skipped => ReNew
// - message was attempted, but is still pending or failed => New
//
// If messages were previously ignored due to being excessive, they would now
// appear to be newer than the messages that got actually synced, so increment
@ -1550,7 +1577,9 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
// in case of interruption - in particular skipping big messages would otherwise
// up the limit too early.
srec = tmsg->srec;
if (srec ? srec->uid[t] < 0 && (svars->chan->ops[t] & (srec->uid[t] == -1 ? OP_RENEW : OP_NEW))
if (srec ? !srec->uid[t] &&
(((srec->status & S_PENDING) && (svars->chan->ops[t] & OP_NEW)) ||
((srec->status & S_SKIPPED) && (svars->chan->ops[t] & OP_RENEW)))
: svars->newmaxuid[1-t] < tmsg->uid && (svars->chan->ops[t] & OP_NEW)) {
debug( "new message %d on %s\n", tmsg->uid, str_ms[1-t] );
if ((svars->chan->ops[t] & OP_EXPUNGE) && (tmsg->flags & F_DELETED)) {
@ -1564,12 +1593,12 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
*svars->srecadd = srec;
svars->srecadd = &srec->next;
svars->nsrecs++;
srec->status = 0;
srec->status = S_PENDING;
srec->wstate = 0;
srec->flags = 0;
srec->tuid[0] = 0;
srec->uid[1-t] = tmsg->uid;
srec->uid[t] = -2;
srec->uid[t] = 0;
srec->msg[1-t] = tmsg;
srec->msg[t] = 0;
tmsg->srec = srec;
@ -1584,10 +1613,10 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
jFprintf( svars, "* %d %d %u\n", srec->uid[M], srec->uid[S], srec->flags );
debug( " -> updated flags to %u\n", tmsg->flags );
}
if (srec->uid[t] == -1) {
if (srec->status != S_PENDING) {
debug( " -> not too big any more\n" );
jFprintf( svars, "%c %d %d -2\n", "<>"[t], srec->uid[M], srec->uid[S] );
srec->uid[t] = -2;
srec->status = S_PENDING;
jFprintf( svars, "~ %d %d %u\n", srec->uid[M], srec->uid[S], srec->status );
}
for (t1 = 0; t1 < TUIDL; t1++) {
t2 = arc4_getbyte() & 0x3f;
@ -1596,11 +1625,12 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
jFprintf( svars, "# %d %d %." stringify(TUIDL) "s\n", srec->uid[M], srec->uid[S], srec->tuid );
debug( " -> %sing message, TUID %." stringify(TUIDL) "s\n", str_hl[t], srec->tuid );
} else {
if (srec->uid[t] == -1) {
if (srec->status == S_SKIPPED) {
debug( " -> not %sing - still too big\n", str_hl[t] );
} else {
debug( " -> not %sing - too big\n", str_hl[t] );
msg_copied_p2( svars, srec, t, -1 );
srec->status = S_SKIPPED;
jFprintf( svars, "~ %d %d %u\n", srec->uid[M], srec->uid[S], srec->status );
}
}
}
@ -1617,7 +1647,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
for (tmsg = svars->msgs[S]; tmsg; tmsg = tmsg->next) {
if (tmsg->status & M_DEAD)
continue;
if ((srec = tmsg->srec) && srec->uid[M] > 0 &&
if ((srec = tmsg->srec) && srec->uid[M] &&
((tmsg->flags | srec->aflags[S]) & ~srec->dflags[S] & F_DELETED) &&
!(srec->status & (S_EXPIRE|S_EXPIRED))) {
/* Message was not propagated yet, or is deleted. */
@ -1635,7 +1665,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
for (tmsg = svars->msgs[S]; tmsg; tmsg = tmsg->next) {
if (tmsg->status & M_DEAD)
continue;
if (!(srec = tmsg->srec) || srec->uid[M] <= 0) {
if (!(srec = tmsg->srec) || !srec->uid[M]) {
/* We did not push the message, so it must be kept. */
debug( " message %d unpropagated\n", tmsg->uid );
todel--;
@ -1726,7 +1756,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
debug( "synchronizing flags\n" );
for (srec = svars->srecs; srec; srec = srec->next) {
if ((srec->status & S_DEAD) || srec->uid[M] <= 0 || srec->uid[S] <= 0)
if ((srec->status & S_DEAD) || !srec->uid[M] || !srec->uid[S])
continue;
for (t = 0; t < 2; t++) {
aflags = srec->aflags[t];
@ -1805,9 +1835,15 @@ msg_copied( int sts, int uid, copy_vars_t *vars )
SVARS_CHECK_CANCEL_RET;
switch (sts) {
case SYNC_OK:
if (uid < 0)
if (!uid) { // Stored to a non-UIDPLUS mailbox
svars->state[t] |= ST_FIND_NEW;
msg_copied_p2( svars, vars->srec, t, uid );
} else {
debug( " -> new UID %d on %s\n", uid, str_ms[t] );
jFprintf( svars, "%c %d %d %d\n", "<>"[t], vars->srec->uid[M], vars->srec->uid[S], uid );
vars->srec->uid[t] = uid;
vars->srec->status &= ~S_PENDING;
vars->srec->tuid[0] = 0;
}
break;
case SYNC_NOGOOD:
debug( " -> killing (%d,%d)\n", vars->srec->uid[M], vars->srec->uid[S] );
@ -1826,24 +1862,6 @@ msg_copied( int sts, int uid, copy_vars_t *vars )
msgs_copied( svars, t );
}
static void
msg_copied_p2( sync_vars_t *svars, sync_rec_t *srec, int t, int uid )
{
/* Possible previous UIDs:
* - -2 when the entry is new
* - -1 when re-newing an entry
* Possible new UIDs:
* - a real UID when storing a message to a UIDPLUS mailbox
* - -2 when storing a message to a dumb mailbox
* - -1 when not actually storing a message */
if (srec->uid[t] != uid) {
debug( " -> new UID %d on %s\n", uid, str_ms[t] );
jFprintf( svars, "%c %d %d %d\n", "<>"[t], srec->uid[M], srec->uid[S], uid );
srec->uid[t] = uid;
srec->tuid[0] = 0;
}
}
static void msgs_found_new( int sts, message_t *msgs, void *aux );
static void msgs_new_done( sync_vars_t *svars, int t );
static void sync_close( sync_vars_t *svars, int t );
@ -2006,7 +2024,7 @@ msgs_flags_set( sync_vars_t *svars, int t )
if ((tmsg->flags & F_DELETED) && !find_int_array( svars->trashed_msgs[t].array, tmsg->uid ) &&
(t == M || !tmsg->srec || !(tmsg->srec->status & (S_EXPIRE|S_EXPIRED)))) {
if (svars->ctx[t]->conf->trash) {
if (!svars->ctx[t]->conf->trash_only_new || !tmsg->srec || tmsg->srec->uid[1-t] < 0) {
if (!svars->ctx[t]->conf->trash_only_new || !tmsg->srec || (tmsg->srec->status & (S_PENDING | S_SKIPPED))) {
debug( "%s: trashing message %d\n", str_ms[t], tmsg->uid );
trash_total[t]++;
stats();
@ -2020,7 +2038,7 @@ msgs_flags_set( sync_vars_t *svars, int t )
} else
debug( "%s: not trashing message %d - not new\n", str_ms[t], tmsg->uid );
} else {
if (!tmsg->srec || tmsg->srec->uid[1-t] < 0) {
if (!tmsg->srec || (tmsg->srec->status & (S_PENDING | S_SKIPPED))) {
if (tmsg->size <= svars->ctx[1-t]->conf->max_size) {
debug( "%s: remote trashing message %d\n", str_ms[t], tmsg->uid );
trash_total[t]++;
@ -2139,18 +2157,18 @@ box_closed_p2( sync_vars_t *svars, int t )
for (srec = svars->srecs; srec; srec = srec->next) {
if (srec->status & S_DEAD)
continue;
if (srec->uid[S] <= 0 || ((srec->wstate & W_DEL(S)) && (svars->state[S] & ST_DID_EXPUNGE))) {
if (srec->uid[M] <= 0 || ((srec->wstate & W_DEL(M)) && (svars->state[M] & ST_DID_EXPUNGE)) ||
if (!srec->uid[S] || ((srec->wstate & W_DEL(S)) && (svars->state[S] & ST_DID_EXPUNGE))) {
if (!srec->uid[M] || ((srec->wstate & W_DEL(M)) && (svars->state[M] & ST_DID_EXPUNGE)) ||
((srec->status & S_EXPIRED) && svars->maxuid[M] >= srec->uid[M] && svars->mmaxxuid >= srec->uid[M])) {
debug( " -> killing (%d,%d)\n", srec->uid[M], srec->uid[S] );
srec->status = S_DEAD;
jFprintf( svars, "- %d %d\n", srec->uid[M], srec->uid[S] );
} else if (srec->uid[S] > 0) {
} else if (srec->uid[S]) {
debug( " -> orphaning (%d,[%d])\n", srec->uid[M], srec->uid[S] );
jFprintf( svars, "> %d %d 0\n", srec->uid[M], srec->uid[S] );
srec->uid[S] = 0;
}
} else if (srec->uid[M] > 0 && ((srec->wstate & W_DEL(M)) && (svars->state[M] & ST_DID_EXPUNGE))) {
} else if (srec->uid[M] && ((srec->wstate & W_DEL(M)) && (svars->state[M] & ST_DID_EXPUNGE))) {
debug( " -> orphaning ([%d],%d)\n", srec->uid[M], srec->uid[S] );
jFprintf( svars, "< %d %d 0\n", srec->uid[M], srec->uid[S] );
srec->uid[M] = 0;

Loading…
Cancel
Save