From 06bd8a0bcf24510068ab6c050c123a30712139e8 Mon Sep 17 00:00:00 2001 From: Oswald Buddenhagen Date: Sat, 19 Feb 2022 15:13:55 +0100 Subject: [PATCH] *** wip: expire newly propagated messages, too --- src/sync.c | 45 ++++++++++++++++++++++++++++++--------------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/sync.c b/src/sync.c index bff3684..3295afd 100644 --- a/src/sync.c +++ b/src/sync.c @@ -634,7 +634,7 @@ box_opened2( sync_vars_t *svars, int t ) if ((chan->ops[xt] & (OP_OLD | OP_NEW | OP_UPGRADE | OP_FLAGS)) && chan->max_messages) svars->any_expiring = 1; if (svars->any_expiring) { - opts[xt] |= OPEN_PAIRED | OPEN_FLAGS; + opts[xt] |= OPEN_OLD | OPEN_NEW | OPEN_PAIRED | OPEN_FLAGS; if (any_dummies[xt]) opts[xt^1] |= OPEN_PAIRED | OPEN_FLAGS; else if (chan->ops[xt] & (OP_OLD | OP_NEW | OP_UPGRADE)) @@ -747,21 +747,40 @@ typedef struct { static int cmp_srec_far( const void *a, const void *b ) { - uint au = (*(const alive_srec_t *)a).srec->uid[F]; - uint bu = (*(const alive_srec_t *)b).srec->uid[F]; - assert( au && bu ); - assert( au != bu ); - return au > bu ? 1 : -1; // Can't subtract, the result might not fit into signed int. + const sync_rec_t *sra = *(const sync_rec_t * const *)a; + const sync_rec_t *srb = *(const sync_rec_t * const *)b; + uint au = sra->uid[F]; + uint bu = srb->uid[F]; + if (au == bu) { + assert( !au ); + // New messages are sorted against each other in the order they are expected + // to be in after propagation. There cannot be null UIDs here, by definition. + au = sra->uid[N]; + bu = srb->uid[N]; + assert( au && bu ); + assert( au != bu ); + return au > bu ? 1 : -1; // Can't subtract, the result might not fit into signed int. + } + // New messages will have the highest UIDs once propagated. + return (au ? au : UINT_MAX) > (bu ? bu : UINT_MAX) ? 1 : -1; } static int cmp_srec_near( const void *a, const void *b ) { - uint au = (*(const alive_srec_t *)a).srec->uid[N]; - uint bu = (*(const alive_srec_t *)b).srec->uid[N]; - assert( au && bu ); - assert( au != bu ); - return au > bu ? 1 : -1; // Can't subtract, the result might not fit into signed int. + const sync_rec_t *sra = *(const sync_rec_t * const *)a; + const sync_rec_t *srb = *(const sync_rec_t * const *)b; + uint au = sra->uid[N]; + uint bu = srb->uid[N]; + if (au == bu) { + assert( !au ); + au = sra->uid[F]; + bu = srb->uid[F]; + assert( au && bu ); + assert( au != bu ); + return au > bu ? 1 : -1; + } + return (au ? au : UINT_MAX) > (bu ? bu : UINT_MAX) ? 1 : -1; } typedef struct { @@ -1190,10 +1209,6 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux continue; // We completely ignore unpaired expire-side messages, as we cannot expire // them without data loss; consequently, we also don't count them. - // Note that we also ignore expire-side messages we're currently propagating, - // which delays expiration of some messages by one cycle. Otherwise, we'd - // have to sequence flag updating after message propagation to avoid a race - // with external expunging, and that seems unreasonably expensive. if (!srec->uid[xt^1]) continue; if (!(srec->status & S_PENDING)) {