@ -155,8 +155,10 @@ typedef struct {
int flags_total [ 2 ] , flags_done [ 2 ] ;
int trash_total [ 2 ] , trash_done [ 2 ] ;
int maxuid [ 2 ] ; /* highest UID that was already propagated */
int newmaxuid [ 2 ] ; /* highest UID that is currently being propagated */
int uidval [ 2 ] ; /* UID validity value */
int newuid [ 2 ] ; /* TUID lookup makes sense only for UIDs >= this */
int mmaxxuid ; /* highest expired UID on master during new message propagation */
int smaxxuid ; /* highest expired UID on slave */
} sync_vars_t ;
@ -806,6 +808,9 @@ box_selected( int sts, void *aux )
goto bail ;
}
}
svars - > newmaxuid [ M ] = svars - > maxuid [ M ] ;
svars - > newmaxuid [ S ] = svars - > maxuid [ S ] ;
svars - > mmaxxuid = INT_MAX ;
line = 0 ;
if ( ( jfp = fopen ( svars - > jname , " r " ) ) ) {
if ( ! stat ( svars - > nname , & st ) & & fgets ( buf , sizeof ( buf ) , jfp ) ) {
@ -829,7 +834,7 @@ box_selected( int sts, void *aux )
}
if ( buf [ 0 ] = = ' # ' ?
( t3 = 0 , ( sscanf ( buf + 2 , " %d %d %n " , & t1 , & t2 , & t3 ) < 2 ) | | ! t3 | | ( t - t3 ! = TUIDL + 3 ) ) :
buf [ 0 ] = = ' ( ' | | buf [ 0 ] = = ' ) ' | | buf [ 0 ] = = ' { ' | | buf [ 0 ] = = ' } ' ?
buf [ 0 ] = = ' ( ' | | buf [ 0 ] = = ' ) ' | | buf [ 0 ] = = ' { ' | | buf [ 0 ] = = ' } ' | | buf [ 0 ] = = ' ! ' ?
( sscanf ( buf + 2 , " %d " , & t1 ) ! = 1 ) :
buf [ 0 ] = = ' + ' | | buf [ 0 ] = = ' & ' | | buf [ 0 ] = = ' - ' | | buf [ 0 ] = = ' | ' | | buf [ 0 ] = = ' / ' | | buf [ 0 ] = = ' \\ ' ?
( sscanf ( buf + 2 , " %d %d " , & t1 , & t2 ) ! = 2 ) :
@ -846,6 +851,8 @@ box_selected( int sts, void *aux )
svars - > newuid [ M ] = t1 ;
else if ( buf [ 0 ] = = ' } ' )
svars - > newuid [ S ] = t1 ;
else if ( buf [ 0 ] = = ' ! ' )
svars - > smaxxuid = t1 ;
else if ( buf [ 0 ] = = ' | ' ) {
svars - > uidval [ M ] = t1 ;
svars - > uidval [ S ] = t2 ;
@ -853,6 +860,10 @@ box_selected( int sts, void *aux )
srec = nfmalloc ( sizeof ( * srec ) ) ;
srec - > uid [ M ] = t1 ;
srec - > uid [ S ] = t2 ;
if ( svars - > newmaxuid [ M ] < t1 )
svars - > newmaxuid [ M ] = t1 ;
if ( svars - > newmaxuid [ S ] < t2 )
svars - > newmaxuid [ S ] = t2 ;
debug ( " new entry(%d,%d) \n " , t1 , t2 ) ;
srec - > msg [ M ] = srec - > msg [ S ] = 0 ;
srec - > status = 0 ;
@ -876,6 +887,8 @@ box_selected( int sts, void *aux )
switch ( buf [ 0 ] ) {
case ' - ' :
debug ( " killed \n " ) ;
if ( srec - > msg [ M ] )
srec - > msg [ M ] - > srec = 0 ;
srec - > status = S_DEAD ;
break ;
case ' # ' :
@ -1014,7 +1027,7 @@ box_selected( int sts, void *aux )
mexcs = 0 ;
nmexcs = rmexcs = 0 ;
if ( svars - > ctx [ M ] - > opts & OPEN_OLD ) {
if ( svars - > smaxxuid ) {
if ( chan - > max_messages ) {
/* When messages have been expired on the slave, the master fetch is split into
* two ranges : The bulk fetch which corresponds with the most recent messages , and an
* exception list of messages which would have been expired if they weren ' t important . */
@ -1198,10 +1211,14 @@ box_loaded( int sts, void *aux )
* - message is old ( > 0 ) or expired ( 0 ) = > ignore
* - message was skipped ( - 1 ) = > ReNew
* - message was attempted , but failed ( - 2 ) = > New
* If new have no srec , the message is always New . */
* If new have no srec , the message is always New . If messages were previously ignored
* due to being excessive , they would now appear to be newer than the messages that
* got actually synced , so make sure to look only at the newest ones . As some messages
* may be already propagated before an interruption , and maxuid logging is delayed ,
* we need to track the newmaxuid separately . */
srec = tmsg - > srec ;
if ( srec ? srec - > uid [ t ] < 0 & & ( svars - > chan - > ops [ t ] & ( srec - > uid [ t ] = = - 1 ? OP_RENEW : OP_NEW ) )
: ( svars - > chan - > ops [ t ] & OP_NEW ) ) {
: svars - > newmaxuid [ 1 - t ] < tmsg - > uid & & ( svars - > chan - > ops [ t ] & OP_NEW ) ) {
debug ( " new message %d on %s \n " , tmsg - > uid , str_ms [ 1 - t ] ) ;
if ( ( svars - > chan - > ops [ t ] & OP_EXPUNGE ) & & ( tmsg - > flags & F_DELETED ) )
debug ( " -> not %sing - would be expunged anyway \n " , str_hl [ t ] ) ;
@ -1220,7 +1237,11 @@ box_loaded( int sts, void *aux )
srec - > tuid [ 0 ] = 0 ;
srec - > uid [ 1 - t ] = tmsg - > uid ;
srec - > uid [ t ] = - 2 ;
srec - > msg [ 1 - t ] = tmsg ;
srec - > msg [ t ] = 0 ;
tmsg - > srec = srec ;
if ( svars - > newmaxuid [ 1 - t ] < tmsg - > uid )
svars - > newmaxuid [ 1 - t ] = tmsg - > uid ;
Fprintf ( svars - > jfp , " + %d %d \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
debug ( " -> pair(%d,%d) created \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
}
@ -1346,6 +1367,10 @@ box_loaded( int sts, void *aux )
alive + + ;
}
}
for ( tmsg = svars - > ctx [ M ] - > msgs ; tmsg ; tmsg = tmsg - > next ) {
if ( ( srec = tmsg - > srec ) & & srec - > tuid [ 0 ] & & ! ( tmsg - > flags & F_DELETED ) )
alive + + ;
}
todel = alive - svars - > chan - > max_messages ;
debug ( " %d alive messages, %d excess - expiring \n " , alive , todel ) ;
for ( tmsg = svars - > ctx [ S ] - > msgs ; tmsg ; tmsg = tmsg - > next ) {
@ -1353,6 +1378,7 @@ box_loaded( int sts, void *aux )
continue ;
if ( ! ( srec = tmsg - > srec ) | | srec - > uid [ M ] < = 0 ) {
/* We did not push the message, so it must be kept. */
debug ( " old pair(%d,%d) unpropagated \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
todel - - ;
} else {
nflags = ( tmsg - > flags | srec - > aflags [ S ] ) & ~ srec - > dflags [ S ] ;
@ -1360,13 +1386,32 @@ box_loaded( int sts, void *aux )
/* The message is not deleted, or is already (being) expired. */
if ( ( nflags & F_FLAGGED ) | | ! ( nflags & F_SEEN ) ) {
/* Important messages are always kept. */
debug ( " old pair(%d,%d) important \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
todel - - ;
} else if ( todel > 0 | |
( ( srec - > status & ( S_EXPIRE | S_EXPIRED ) ) = = ( S_EXPIRE | S_EXPIRED ) ) | |
( ( srec - > status & ( S_EXPIRE | S_EXPIRED ) ) & & ( tmsg - > flags & F_DELETED ) ) ) {
/* The message is excess or was already (being) expired. */
srec - > status | = S_NEXPIRE ;
debug ( " pair(%d,%d) \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
debug ( " old pair(%d,%d) expired \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
todel - - ;
}
}
}
}
for ( tmsg = svars - > ctx [ M ] - > msgs ; tmsg ; tmsg = tmsg - > next ) {
if ( ( srec = tmsg - > srec ) & & srec - > tuid [ 0 ] ) {
nflags = tmsg - > flags ;
if ( ! ( nflags & F_DELETED ) ) {
if ( ( nflags & F_FLAGGED ) | | ! ( nflags & F_SEEN ) ) {
/* Important messages are always fetched. */
debug ( " new pair(%d,%d) important \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
todel - - ;
} else if ( todel > 0 ) {
/* The message is excess. */
srec - > status | = S_NEXPIRE ;
debug ( " new pair(%d,%d) expired \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
svars - > mmaxxuid = srec - > uid [ M ] ;
todel - - ;
}
}
@ -1374,7 +1419,10 @@ box_loaded( int sts, void *aux )
}
debug ( " %d excess messages remain \n " , todel ) ;
for ( srec = svars - > srecs ; srec ; srec = srec - > next ) {
if ( ( srec - > status & ( S_DEAD | S_DONE ) ) | | ! srec - > msg [ S ] )
if ( srec - > status & S_DEAD )
continue ;
if ( ! srec - > tuid [ 0 ] ) {
if ( ! srec - > msg [ S ] )
continue ;
nex = ( srec - > status / S_NEXPIRE ) & 1 ;
if ( nex ! = ( ( srec - > status / S_EXPIRED ) & 1 ) ) {
@ -1392,6 +1440,14 @@ box_loaded( int sts, void *aux )
/* Note: the "wrong" transaction may be pending here,
* e . g . : S_NEXPIRE = 0 , S_EXPIRE = 1 , S_EXPIRED = 0. */
}
} else {
if ( srec - > status & S_NEXPIRE ) {
Fprintf ( svars - > jfp , " - %d %d \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
debug ( " pair(%d,%d): 1 (abort) \n " , srec - > uid [ M ] , srec - > uid [ S ] ) ;
srec - > msg [ M ] - > srec = 0 ;
srec - > status = S_DEAD ;
}
}
}
}
@ -1515,6 +1571,16 @@ msg_copied_p2( sync_vars_t *svars, sync_rec_t *srec, int t, int uid )
srec - > uid [ t ] = uid ;
srec - > tuid [ 0 ] = 0 ;
}
if ( t = = S & & svars - > mmaxxuid < srec - > uid [ M ] ) {
/* If we have so many new messages that some of them are instantly expired,
* but some are still propagated because they are important , we need to
* ensure explicitly that the bulk fetch limit is upped . */
svars - > mmaxxuid = INT_MAX ;
if ( svars - > smaxxuid < srec - > uid [ S ] - 1 ) {
svars - > smaxxuid = srec - > uid [ S ] - 1 ;
Fprintf ( svars - > jfp , " ! %d \n " , svars - > smaxxuid ) ;
}
}
}
static void msgs_found_new ( int sts , void * aux ) ;
@ -1735,13 +1801,14 @@ box_closed_p2( sync_vars_t *svars, int t )
if ( ! ( svars - > state [ 1 - t ] & ST_CLOSED ) )
return ;
if ( ( ( svars - > state [ M ] | svars - > state [ S ] ) & ST_DID_EXPUNGE ) | | svars - > smaxxuid ) {
if ( ( ( svars - > state [ M ] | svars - > state [ S ] ) & ST_DID_EXPUNGE ) | | svars - > chan - > max_messages ) {
/* This cleanup is not strictly necessary, as the next full sync
would throw out the dead entries anyway . But . . . */
debug ( " purging obsolete entries \n " ) ;
minwuid = INT_MAX ;
if ( svars - > smaxxuid ) {
debug ( " preparing entry purge - max expired slave uid is %d\n " , svars - > smaxxuid ) ;
if ( svars - > chan - > max_messages ) {
debug ( " max expired slave uid is %d \n " , svars - > smaxxuid ) ;
for ( srec = svars - > srecs ; srec ; srec = srec - > next ) {
if ( srec - > status & S_DEAD )
continue ;