@ -100,7 +100,6 @@ make_flags( int flags, char *buf )
# define S_EXPIRED (1<<4) /* the entry is expired (slave message removal confirmed) */
# define S_EXPIRED (1<<4) /* the entry is expired (slave message removal confirmed) */
# define S_EXPIRE (1<<5) /* the entry is being expired (slave message removal scheduled) */
# define S_EXPIRE (1<<5) /* the entry is being expired (slave message removal scheduled) */
# define S_NEXPIRE (1<<6) /* temporary: new expiration state */
# define S_NEXPIRE (1<<6) /* temporary: new expiration state */
# define S_EXP_S (1<<7) /* temporary: expired slave message is actually gone */
# define mvBit(in,ib,ob) ((unsigned char)(((unsigned)in) * (ob) / (ib)))
# define mvBit(in,ib,ob) ((unsigned char)(((unsigned)in) * (ob) / (ib)))
@ -653,6 +652,7 @@ box_selected( int sts, void *aux )
channel_conf_t * chan ;
channel_conf_t * chan ;
FILE * jfp ;
FILE * jfp ;
int opts [ 2 ] , line , t1 , t2 , t3 ;
int opts [ 2 ] , line , t1 , t2 , t3 ;
int * mexcs , nmexcs , rmexcs , minwuid ;
struct stat st ;
struct stat st ;
struct flock lck ;
struct flock lck ;
char fbuf [ 16 ] ; /* enlarge when support for keywords is added */
char fbuf [ 16 ] ; /* enlarge when support for keywords is added */
@ -1010,7 +1010,63 @@ box_selected( int sts, void *aux )
svars - > drv [ M ] - > prepare_opts ( ctx [ M ] , opts [ M ] ) ;
svars - > drv [ M ] - > prepare_opts ( ctx [ M ] , opts [ M ] ) ;
svars - > drv [ S ] - > prepare_opts ( ctx [ S ] , opts [ S ] ) ;
svars - > drv [ S ] - > prepare_opts ( ctx [ S ] , opts [ S ] ) ;
if ( ! svars - > smaxxuid & & load_box ( svars , M , ( ctx [ M ] - > opts & OPEN_OLD ) ? 1 : INT_MAX , 0 , 0 ) )
mexcs = 0 ;
nmexcs = rmexcs = 0 ;
if ( svars - > ctx [ M ] - > opts & OPEN_OLD ) {
if ( svars - > smaxxuid ) {
/* When messages have been expired on the slave, the master fetch is split into
* two ranges : The bulk fetch which corresponds with the most recent messages , and an
* exception list of messages which would have been expired if they weren ' t important . */
debug ( " preparing master selection - max expired slave uid is %d \n " , svars - > smaxxuid ) ;
/* First, find out the lower bound for the bulk fetch. */
minwuid = INT_MAX ;
for ( srec = svars - > srecs ; srec ; srec = srec - > next ) {
if ( srec - > status & S_DEAD )
continue ;
if ( srec - > status & S_EXPIRED ) {
if ( ! srec - > uid [ S ] ) {
/* The expired message was already gone. */
continue ;
}
/* The expired message was not expunged yet, so re-examine it.
* This will happen en masse , so just extend the bulk fetch . */
} else {
if ( svars - > smaxxuid > = srec - > uid [ S ] ) {
/* The non-expired message is in the generally expired range, so don't
* make it contribute to the bulk fetch . */
continue ;
}
/* Usual non-expired message. */
}
if ( minwuid > srec - > uid [ M ] )
minwuid = srec - > uid [ M ] ;
}
debug ( " min non-orphaned master uid is %d \n " , minwuid ) ;
/* Next, calculate the exception fetch. */
for ( srec = svars - > srecs ; srec ; srec = srec - > next ) {
if ( srec - > status & S_DEAD )
continue ;
if ( srec - > uid [ M ] > 0 & & srec - > uid [ S ] > 0 & & minwuid > srec - > uid [ M ] & &
( ! ( svars - > ctx [ M ] - > opts & OPEN_NEW ) | | svars - > maxuid [ M ] > = srec - > uid [ M ] ) ) {
/* The pair is alive, but outside the bulk range. */
if ( nmexcs = = rmexcs ) {
rmexcs = rmexcs * 2 + 100 ;
mexcs = nfrealloc ( mexcs , rmexcs * sizeof ( int ) ) ;
}
mexcs [ nmexcs + + ] = srec - > uid [ M ] ;
}
}
debugn ( " exception list is: " ) ;
for ( t = 0 ; t < nmexcs ; t + + )
debugn ( " %d " , mexcs [ t ] ) ;
debug ( " \n " ) ;
} else {
minwuid = 1 ;
}
} else {
minwuid = INT_MAX ;
}
if ( load_box ( svars , M , minwuid , mexcs , nmexcs ) )
return ;
return ;
load_box ( svars , S , ( ctx [ S ] - > opts & OPEN_OLD ) ? 1 : INT_MAX , 0 , 0 ) ;
load_box ( svars , S , ( ctx [ S ] - > opts & OPEN_OLD ) ? 1 : INT_MAX , 0 , 0 ) ;
}
}
@ -1067,7 +1123,7 @@ box_loaded( int sts, void *aux )
message_t * tmsg ;
message_t * tmsg ;
copy_vars_t * cv ;
copy_vars_t * cv ;
flag_vars_t * fv ;
flag_vars_t * fv ;
int uid , minwuid , * mexcs , nmexcs , rmexcs , no [ 2 ] , del [ 2 ] , alive , todel , t1 , t2 ;
int uid , no [ 2 ] , del [ 2 ] , alive , todel , t1 , t2 ;
int sflags , nflags , aflags , dflags , nex ;
int sflags , nflags , aflags , dflags , nex ;
unsigned hashsz , idx ;
unsigned hashsz , idx ;
char fbuf [ 16 ] ; /* enlarge when support for keywords is added */
char fbuf [ 16 ] ; /* enlarge when support for keywords is added */
@ -1123,64 +1179,6 @@ box_loaded( int sts, void *aux )
}
}
free ( srecmap ) ;
free ( srecmap ) ;
if ( ( t = = S ) & & svars - > smaxxuid ) {
/* When messages have been expired on the slave, the master fetch is split into
* two ranges : The bulk fetch which corresponds with the most recent messages , and an
* exception list of messages which would have been expired if they weren ' t important . */
debug ( " preparing master selection - max expired slave uid is %d \n " , svars - > smaxxuid ) ;
/* First, find out the lower bound for the bulk fetch.
* On the way , mark successfully expired messages . */
minwuid = INT_MAX ;
for ( srec = svars - > srecs ; srec ; srec = srec - > next ) {
if ( srec - > status & S_DEAD )
continue ;
if ( srec - > status & S_EXPIRED ) {
if ( ! srec - > uid [ S ] | | ( ( svars - > ctx [ S ] - > opts & OPEN_OLD ) & & ! srec - > msg [ S ] ) ) {
/* The expired message was already gone or it is gone now. */
srec - > status | = S_EXP_S ;
continue ;
}
/* The expired message was not expunged yet, so re-examine it.
* This will happen en masse , so just extend the bulk fetch . */
} else {
if ( svars - > smaxxuid > = srec - > uid [ S ] ) {
/* The non-expired message is in the generally expired range, so don't
* make it contribute to the bulk fetch . */
continue ;
}
/* Usual non-expired message. */
}
if ( minwuid > srec - > uid [ M ] )
minwuid = srec - > uid [ M ] ;
}
debug ( " min non-orphaned master uid is %d \n " , minwuid ) ;
/* Next, calculate the exception fetch. */
mexcs = 0 ;
nmexcs = rmexcs = 0 ;
if ( svars - > ctx [ M ] - > opts & OPEN_OLD ) {
for ( srec = svars - > srecs ; srec ; srec = srec - > next ) {
if ( srec - > status & S_DEAD )
continue ;
if ( srec - > uid [ M ] > 0 & & srec - > uid [ S ] > 0 & &
! ( srec - > status & S_EXP_S ) & & minwuid > srec - > uid [ M ] & &
( ! ( svars - > ctx [ M ] - > opts & OPEN_NEW ) | | svars - > maxuid [ M ] > = srec - > uid [ M ] ) ) {
/* The pair is alive, but outside the bulk range. */
if ( nmexcs = = rmexcs ) {
rmexcs = rmexcs * 2 + 100 ;
mexcs = nfrealloc ( mexcs , rmexcs * sizeof ( int ) ) ;
}
mexcs [ nmexcs + + ] = srec - > uid [ M ] ;
}
}
}
debugn ( " exception list is: " ) ;
for ( t = 0 ; t < nmexcs ; t + + )
debugn ( " %d " , mexcs [ t ] ) ;
debug ( " \n " ) ;
load_box ( svars , M , minwuid , mexcs , nmexcs ) ;
return ;
}
if ( ! ( svars - > state [ 1 - t ] & ST_LOADED ) )
if ( ! ( svars - > state [ 1 - t ] & ST_LOADED ) )
return ;
return ;