@ -39,6 +39,8 @@
# define fdatasync fsync
# endif
# define JOURNAL_VERSION "4"
channel_conf_t global_conf ;
channel_conf_t * channels ;
group_conf_t * groups ;
@ -167,10 +169,12 @@ typedef struct {
uint ref_count , nsrecs , opts [ 2 ] ;
uint new_pending [ 2 ] , flags_pending [ 2 ] , trash_pending [ 2 ] ;
uint maxuid [ 2 ] ; // highest UID that was already propagated
uint oldmaxuid [ 2 ] ; // highest UID that was already propagated before this run
uint uidval [ 2 ] ; // UID validity value
uint newuidval [ 2 ] ; // UID validity obtained from driver
uint finduid [ 2 ] ; // TUID lookup makes sense only for UIDs >= this
uint maxxfuid ; // highest expired UID on far side
uint oldmaxxfuid ; // highest expired UID on far side before this run
uchar good_flags [ 2 ] , bad_flags [ 2 ] ;
} sync_vars_t ;
@ -218,6 +222,15 @@ static int check_cancel( sync_vars_t *svars );
# define ST_SENDING_NEW (1<<15)
static void
create_state ( sync_vars_t * svars )
{
if ( ! ( svars - > nfp = fopen ( svars - > nname , " w " ) ) ) {
sys_error ( " Error: cannot create new sync state %s " , svars - > nname ) ;
exit ( 1 ) ;
}
}
static void ATTR_PRINTFLIKE ( 2 , 3 )
jFprintf ( sync_vars_t * svars , const char * msg , . . . )
{
@ -225,6 +238,16 @@ jFprintf( sync_vars_t *svars, const char *msg, ... )
if ( JLimit & & ! - - JLimit )
exit ( 101 ) ;
if ( ! svars - > jfp ) {
create_state ( svars ) ;
if ( ! ( svars - > jfp = fopen ( svars - > jname , svars - > replayed ? " a " : " w " ) ) ) {
sys_error ( " Error: cannot create journal %s " , svars - > jname ) ;
exit ( 1 ) ;
}
setlinebuf ( svars - > jfp ) ;
if ( ! svars - > replayed )
Fprintf ( svars - > jfp , JOURNAL_VERSION " \n " ) ;
}
va_start ( va , msg ) ;
vFprintf ( svars - > jfp , msg , va ) ;
va_end ( va ) ;
@ -625,8 +648,6 @@ clean_strdup( const char *s )
}
# define JOURNAL_VERSION "4"
static int
prepare_state ( sync_vars_t * svars )
{
@ -704,6 +725,12 @@ save_state( sync_vars_t *svars )
sync_rec_t * srec ;
char fbuf [ 16 ] ; /* enlarge when support for keywords is added */
// If no change was made, the state is also unmodified.
if ( ! svars - > jfp & & ! svars - > replayed )
return ;
if ( ! svars - > nfp )
create_state ( svars ) ;
Fprintf ( svars - > nfp ,
" FarUidValidity %u \n NearUidValidity %u \n MaxPulledUid %u \n MaxPushedUid %u \n " ,
svars - > uidval [ F ] , svars - > uidval [ N ] , svars - > maxuid [ F ] , svars - > maxuid [ N ] ) ;
@ -719,6 +746,7 @@ save_state( sync_vars_t *svars )
}
Fclose ( svars - > nfp , 1 ) ;
if ( svars - > jfp )
Fclose ( svars - > jfp , 0 ) ;
if ( ! ( DFlags & KEEPJOURNAL ) ) {
/* order is important! */
@ -1234,18 +1262,6 @@ box_opened2( sync_vars_t *svars, int t )
if ( ! lock_state ( svars ) )
goto bail ;
if ( ! ( svars - > nfp = fopen ( svars - > nname , " w " ) ) ) {
sys_error ( " Error: cannot create new sync state %s " , svars - > nname ) ;
goto bail ;
}
if ( ! ( svars - > jfp = fopen ( svars - > jname , " a " ) ) ) {
sys_error ( " Error: cannot create journal %s " , svars - > jname ) ;
fclose ( svars - > nfp ) ;
goto bail ;
}
setlinebuf ( svars - > jfp ) ;
if ( ! svars - > replayed )
jFprintf ( svars , JOURNAL_VERSION " \n " ) ;
opts [ F ] = opts [ N ] = 0 ;
if ( fails )
@ -1495,10 +1511,16 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
JLOG ( " | %u %u " , ( svars - > uidval [ F ] , svars - > uidval [ N ] ) , " new UIDVALIDITYs " ) ;
}
svars - > oldmaxuid [ F ] = svars - > maxuid [ F ] ;
svars - > oldmaxuid [ N ] = svars - > maxuid [ N ] ;
svars - > oldmaxxfuid = svars - > maxxfuid ;
info ( " Synchronizing... \n " ) ;
for ( t = 0 ; t < 2 ; t + + )
svars - > good_flags [ t ] = ( uchar ) svars - > drv [ t ] - > get_supported_flags ( svars - > ctx [ t ] ) ;
int any_new [ 2 ] = { 0 , 0 } ;
debug ( " synchronizing old entries \n " ) ;
for ( srec = svars - > srecs ; srec ; srec = srec - > next ) {
if ( srec - > status & S_DEAD )
@ -1643,6 +1665,7 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
srec - > status = S_PENDING ;
JLOG ( " ~ %u %u %u " , ( srec - > uid [ F ] , srec - > uid [ N ] , srec - > status ) , " was too big " ) ;
}
any_new [ t ] = 1 ;
} else {
if ( srec - > status = = S_SKIPPED ) {
debug ( " -> still too big \n " ) ;
@ -1825,12 +1848,16 @@ box_loaded( int sts, message_t *msgs, int total_msgs, int recent_msgs, void *aux
}
debug ( " propagating new messages \n " ) ;
if ( UseFSync )
if ( UseFSync & & svars - > jfp )
fdatasync ( fileno ( svars - > jfp ) ) ;
for ( t = 0 ; t < 2 ; t + + ) {
if ( any_new [ t ] ) {
svars - > finduid [ t ] = svars - > drv [ t ] - > get_uidnext ( svars - > ctx [ t ] ) ;
JLOG ( " F %d %u " , ( t , svars - > finduid [ t ] ) , " save UIDNEXT of %s " , str_fn [ t ] ) ;
svars - > new_msgs [ t ] = svars - > msgs [ 1 - t ] ;
} else {
svars - > state [ t ] | = ST_SENT_NEW ;
}
msgs_copied ( svars , t ) ;
if ( check_cancel ( svars ) )
goto out ;
@ -2164,6 +2191,7 @@ box_closed_p2( sync_vars_t *svars, int t )
// ensure that all pending messages are still loaded next time in case
// of interruption - in particular skipping big messages would otherwise
// up the limit too early.
if ( svars - > maxuid [ t ] ! = svars - > oldmaxuid [ t ] )
JLOG ( " N %d %u " , ( t , svars - > maxuid [ t ] ) , " up maxuid of %s " , str_fn [ t ] ) ;
}
@ -2191,6 +2219,7 @@ box_closed_p2( sync_vars_t *svars, int t )
// This is just an optimization, so it needs no journaling of intermediate states.
// However, doing it before the entry purge would require ensuring that the
// exception list includes all relevant messages.
if ( svars - > maxxfuid ! = svars - > oldmaxxfuid )
JLOG ( " ! %u " , svars - > maxxfuid , " max expired UID on far side " ) ;
save_state ( svars ) ;