Sphinx replication II
Four years ago I presented the first version of the replication for Sphinx Search. Since that time I have introduced many modifications of the structure: partitioning of delta indices, introducing the distrubuted index. Additionaly, I have changed the system environment from CentOs into LXC (Debian). The storage has been divided into shardes conform to the number of IO, starting from RAM disk, through SSD disk to SATA at the bottom.
The old script has no change to work…, so I rewrite it and
#!/bin/bash # Version 0.1 # since 0.1 # 0.1.1 Support for cold start # User by start script # if [ "$NODE_ROLE" = "NODE_ROLE_SLAVE" ]; then # /root/scripts/cron/sphinx-slave.sh SCRIPT_NAME=`basename $0` MASTER_DIR="/storage_master/"; LOCAL_DIR="/storage/"; LOG_ERROR_DIR="/var/log/sphinx/slave/"; LOG_FILE="/var/log/sphinx/slave.log"; LOCK_FILE="/tmp/slave.lock"; SIGN_FILE="/var/log/sphinx/slave.sgn"; UNIQUE_NUMBER=""; # glowny plik do monitorowania - powstaje ostatni FILE_TIMESTAMP="sph" FILE_TIMESTAMP_QTY=0 # Plik blokady procesu sphinx. Jezeli proces dziala, plik istnieje. FILE_LOCK_SPH="spl" MAKE_ROTATE=1; MAKE_RESTART=0; CURRENT_UNIX=$((0 + `date +%s`)) SPH_INDEX="" SPH_INDEX_DIRS=('daily' 'weekly' 'archive'); # Bazowy indeks, ktory wymaga wylaczenia procesu oraz usuniecia plikow SPH_DIR_ARCHIVE="archive" SPH_INDEX_REPORTED=(); SPH_INDEX_REPORTED_ITERATOR=0; SPH_DIR_WEEKLY="weekly" SPH_INDEX_WEEKLY_REPORTED=0; SPH_STOPWORD_FILE="stopwords-pl" SPH_INDEX_ERR=(); SPH_INDEX_NEW=(); SPH_INDEX_DIRS_QTY=${#SPH_INDEX_DIRS[@]}; SPH_INDEX_NEW_ITEM_ITERATOR=0; FILES_REQUIRED_IN_QTY=8; FILES_TO_RM=('spl'); FILES_TO_RM_QTY=${#FILES_TO_RM[@]}; IDX_AWAITING_EXT="new"; IDX_TO_ROTATE=() IDX_TO_ROTATE_QTY=0; IDX_TO_ROTATE_ITR=0 IDX_TO_ROTATE_TTL=300 IDX_TO_ROTATE_LOOP_ITR=0 # The maximum difference between the files (timestamp files) of master and slave # [s] MAX_DIFF_INDEX=300 # regex containing some problems which are not critical # stale = Stale NFS file handle ERROR CP_KNOWN_ISSUE="Stale" VERBOSE=0 WS="http://ws-indexer-sph.YOUR_NETWORK/bootstrap.php?" # To be set HOMEDIR=""; # Block functions function set_homedir { dirExec=`pwd`; fileExec=`basename $1`; fullPath="$dirExec/$fileExec"; if [ -f $fullPath ]; then HOMEDIR=$dirExec; else HOMEDIR=`dirname $0`; fi } function check_all { if [ -f $LOCK_FILE ]; then write_to_log "Lock file exists ($LOCK_FILE)"; return 0; fi return 1; } function compare_int { if [ $1 -gt $2 ]; then return 1; else return 0; fi } function write_to_log { local INFO="$1" local FOR_VERBOSE_MOD=$2 if [ -z $FOR_VERBOSE_MOD ]; then FOR_VERBOSE_MOD=0 fi # echo "$VERBOSE < $FOR_VERBOSE_MOD ?" >> $LOG_FILE if [ "$VERBOSE" -lt "$FOR_VERBOSE_MOD" ]; then return; fi TIME=$(date '+%Y.%m.%d|%H:%M'); echo "[$UNIQUE_NUMBER] $TIME $INFO" >> $LOG_FILE } function file_update_unix { local FILE_PATH="$1" local FILE_UNIX_TS=0 if [ -f "$FILE_PATH" ]; then FILE_UNIX_TS=`/usr/bin/stat -c '%Y' "$FILE_PATH"`; FILE_UNIX_TS=$(($FILE_UNIX_TS + 0)); fi echo $FILE_UNIX_TS } function write_to_stderr { echo "$@" 1>&2 } # End block functions # START START_AT=`date +"%s.%N"` # Detect homedir set_homedir $0 UNIQUE_NUMBER=$(date '+%s'); check_all MAY_CONTINUE=$?; if [ "$MAY_CONTINUE" -eq 0 ]; then exit 0; fi touch $LOCK_FILE; date > "$SIGN_FILE" for ((INDICES_ITERATOR=0;INDICES_ITERATOR<$SPH_INDEX_DIRS_QTY;INDICES_ITERATOR++)); do SPH_INDEX_DIR=${SPH_INDEX_DIRS[${INDICES_ITERATOR}]}; # BLOCK StopwordSTOPWORD_MASTER_TS=$(file_update_unix "$MASTER_DIR$SPH_INDEX_DIR/$SPH_STOPWORD_FILE") write_to_log "Master stopword ts: $MASTER_DIR$SPH_INDEX_DIR/$SPH_STOPWORD_FILE : $STOPWORD_MASTER_TS" 1; STOPWORD_SLAVE_TS=$(file_update_unix "$LOCAL_DIR$SPH_INDEX_DIR/$SPH_STOPWORD_FILE") write_to_log "Slave stopword ts: $LOCAL_DIR$SPH_INDEX_DIR/$SPH_STOPWORD_FILE : $STOPWORD_SLAVE_TS" 1; compare_int $STOPWORD_MASTER_TS $STOPWORD_SLAVE_TS is_new=$? write_to_log "stopword compare_int returns $is_new" 1; if [ -z "$STOPWORD_MASTER_TS" ]; then is_new=0 fi write_to_log "stopword compare_int returns $is_new" 1; if [ "$is_new" -eq "1" ]; then cp -p "$MASTER_DIR$SPH_INDEX_DIR/$SPH_STOPWORD_FILE" "$LOCAL_DIR$SPH_INDEX_DIR/$SPH_STOPWORD_FILE" 2>/tmp/$SCRIPT_NAME.cp_res.err cp_result=$? if [ "$cp_result" -gt "0" ]; then error_info=$(cat /tmp/$SCRIPT_NAME.cp_res.err) write_to_log "ERROR at stopwords: $SPH_INDEX_DIR msg: $error_info." 0 write_to_stderr "ERROR at stopwords: $SPH_INDEX_DIR msg: $error_info." else write_to_log "Stopwords file '$MASTER_DIR$SPH_INDEX_DIR/$SPH_STOPWORD_FILE' successfuly copied." 0 fi fi # BLOCK Stopword FILE_TIMESTAMP_QTY=0 for SPH_FILE_PATH in $MASTER_DIR$SPH_INDEX_DIR/*.$FILE_TIMESTAMP ; do let FILE_TIMESTAMP_QTY++; # The changes are so fast that sometimes the files disapear test -f "$SPH_FILE_PATH" || continue # There is temp|new file. Index is builded" echo "$SPH_FILE_PATH" | grep -c ".new." >/dev/null && continue echo "$SPH_FILE_PATH" | grep -c ".tmp." >/dev/null && continue SPH_FILE=$(basename "$SPH_FILE_PATH") SPH_INDEX="${SPH_FILE%.*}" # echo $SPH_INDEX "($SPH_INDEX_DIR)" if [ -f $MASTER_DIR$SPH_INDEX_DIR/$SPH_INDEX.$FILE_TIMESTAMP ] && [ -f $MASTER_DIR$SPH_INDEX_DIR/$SPH_INDEX.$FILE_LOCK_SPH ]; then # Musza istniec obydwa pliki MASTER_STAMP=`/usr/bin/stat -c '%Y' $MASTER_DIR$SPH_INDEX_DIR/$SPH_INDEX.$FILE_TIMESTAMP`; MASTER_STAMP=$((MASTER_STAMP + 0)); else # No sph|spl files. spl: master does not work, sph: index does not exists MASTER_STAMP=0; fi if [ -f $LOCAL_DIR$SPH_INDEX_DIR/$SPH_INDEX.$FILE_TIMESTAMP ]; then LOCAL_STAMP=`/usr/bin/stat -c '%Y' $LOCAL_DIR$SPH_INDEX_DIR/$SPH_INDEX.$FILE_TIMESTAMP`; LOCAL_STAMP=$((LOCAL_STAMP + 0)); else LOCAL_STAMP=0; fi if [ "$MASTER_STAMP" -eq 0 ]; then write_to_log "Brak indeksu master: $SPH_INDEX ($MASTER_DIR$SPH_INDEX_DIR/$SPH_INDEX.$FILE_TIMESTAMP)" 0 else INDEX_DIFF_TS=$(( $MASTER_STAMP - $LOCAL_STAMP)) MASTER_DIFF_TS=$(( $CURRENT_UNIX - $MASTER_STAMP)) # It can be triggered only when the master index has been built some long time ago (and not before 2 minutes) [second condition] if [ "$INDEX_DIFF_TS" -gt "$MAX_DIFF_INDEX" ] && [ "$MASTER_DIFF_TS" -gt "$MAX_DIFF_INDEX" ]; then write_to_log "Error: index '$SPH_INDEX_DIR/$SPH_INDEX' is overdue. MASTER|SLAVE: $MASTER_STAMP|$LOCAL_STAMP (Diff=$INDEX_DIFF_TS, Max=$MAX_DIFF_INDEX)" 0; write_to_stderr "Error: index '$SPH_INDEX_DIR/$SPH_INDEX' is overdue. MASTER|SLAVE: $MASTER_STAMP|$LOCAL_STAMP (Diff=$INDEX_DIFF_TS, Max=$MAX_DIFF_INDEX)" else write_to_log "Index '$SPH_INDEX_DIR/$SPH_INDEX' is ok. MASTER|SLAVE: $MASTER_STAMP|$LOCAL_STAMP (Diff=$INDEX_DIFF_TS). Master diff=$MASTER_DIFF_TS" 2; fi compare_int $MASTER_STAMP $LOCAL_STAMP is_new=$? if [ "$is_new" -eq "1" ]; then # write logs write_to_log "Index '$SPH_INDEX_DIR/$SPH_INDEX' is to be renewing. MASTER|SLAVE: $MASTER_STAMP|$LOCAL_STAMP" 0; write_to_log "Stat for index '$SPH_INDEX_DIR/$SPH_INDEX' MASTER|SLAVE: $MASTER_STAMP|$LOCAL_STAMP" 1; # Sprawdz, czy na pewno nie ma plikow tymczasowych (oznaczajacych, ze indeks jest w trakcie budowy) TMP_FILE_QTY=$(ls -1 -U $MASTER_DIR$SPH_INDEX_DIR/$SPH_INDEX.*.tmp* 2>/dev/null | wc -l); TMP_FILE_QTY=$(( TMP_FILE_QTY + 0)); if [ "$TMP_FILE_QTY" -gt "0" ]; then write_to_log "Temporary files are available [$TMP_FILE_QTY]. Omitting index renewal - '$SPH_INDEX'." 0; continue; fi SPH_INDEX_COPY_SRC=$MASTER_DIR$SPH_INDEX_DIR/$SPH_INDEX".sp?"; SPH_INDEX_COPY_DST=$LOCAL_DIR$SPH_INDEX_DIR"/tmp/"; if [ ! -d "$SPH_INDEX_COPY_DST" ]; then mkdir "$SPH_INDEX_COPY_DST" fi write_to_log "Start copying: `date +"%s-%N"`" 1 write_to_log "cp -vp $SPH_INDEX_COPY_SRC $SPH_INDEX_COPY_DST | wc -l >/tmp/$SCRIPT_NAME.cp_res 2>/tmp/$SCRIPT_NAME.cp_res.err" 2 cp -vp $SPH_INDEX_COPY_SRC $SPH_INDEX_COPY_DST 2>/tmp/$SCRIPT_NAME.cp_res.err | wc -l >/tmp/$SCRIPT_NAME.cp_res 2>>/tmp/$SCRIPT_NAME.cp_res.err cp_result=${PIPESTATUS[0]} write_to_log "Stop copying : `date +"%s-%N"`" 1 FILE_QTY=$((0+ `cat /tmp/$SCRIPT_NAME.cp_res`)) # cp_result=`echo -n $cp_result | sed 's/\s\n//g'`; if [ "$cp_result" -gt "0" ]; then SPH_INDEX_ERR[$INDICES_ITERATOR]=1; error_info=$(cat /tmp/$SCRIPT_NAME.cp_res.err) IS_KNOWN=$(echo "$error_info" | grep -c "$CP_KNOWN_ISSUE") write_to_log "ERROR: $SPH_INDEX msg: $error_info. Is known? $IS_KNOWN" 0 write_to_stderr "Index '$SPH_INDEX' error code: "$cp_result", is known? $IS_KNOWN, err-msg: $error_info" # Let sombody know about the problem if [ "$IS_KNOWN" = "0" ]; then write_to_log "Index '$SPH_INDEX' problem during copying the files. (Qty: $FILE_QTY but expected: $FILES_REQUIRED_IN_QTY)" 0 write_to_stderr "Index '$SPH_INDEX' problem during copying the files. Error info: "$error_info" (Qty: $FILE_QTY but expected: $FILES_REQUIRED_IN_QTY)" fi continue; elif [ "$FILE_QTY" -ne "$FILES_REQUIRED_IN_QTY" ]; then write_to_log "ERROR: $SPH_INDEX (Unexpected qty of files: $FILE_QTY, expected: $FILES_REQUIRED_IN_QTY)" 0 continue; else write_to_log "Index '$SPH_INDEX' successfuly copied. (Qty: $FILE_QTY expected: $FILES_REQUIRED_IN_QTY)" 0 fi for ((TO_COPY_ITERATOR=0;TO_COPY_ITERATOR<$FILES_TO_RM_QTY;TO_COPY_ITERATOR++)); do FILE_TO_RM=${FILES_TO_RM[${TO_COPY_ITERATOR}]}; write_to_log "Start removing: `date +"%s-%N"`" 2 cp_result=`rm "$SPH_INDEX_COPY_DST$SPH_INDEX.$FILE_TO_RM" 2>&1 `; write_to_log "Stop removing: `date +"%s-%N"`" 2 cp_result=`echo -n $cp_result | sed 's/\s\n//g'`; if [ -n "$cp_result" ]; then SPH_INDEX_ERR[$INDICES_ITERATOR]=1; MAKE_ROTATE=0; write_to_log "ERROR: $SPH_INDEX $FILE_TO_RM error: $cp_result." 0 else write_to_log "$SPH_INDEX $FILE_TO_RM successfuly removed." 2 fi done FILES_REQUIRED_IN_QTY2=$(($FILES_REQUIRED_IN_QTY - $FILES_TO_RM_QTY)) FILE_QTY=$(ls -1 -U $SPH_INDEX_COPY_DST$SPH_INDEX.sp? 2>/dev/null | /usr/bin/wc -l) FILE_QTY=$((0 + $FILE_QTY)) if [ "$FILE_QTY" -ne "$FILES_REQUIRED_IN_QTY2" ]; then write_to_log "ERROR: $SPH_INDEX (Unexpected qty of files has been copied: $FILE_QTY, expected: $FILES_REQUIRED_IN_QTY2)" 0 continue; else write_to_log "Notice: index '$SPH_INDEX'. The following qty of files has been copied: $FILE_QTY, expected: $FILES_REQUIRED_IN_QTY2" 2 fi if [ "$LOCAL_STAMP" -gt "0" ]; then cp_result=`rename -f 's/(.*)\/tmp\/(.*)\./$1\/$2\.new\./' $SPH_INDEX_COPY_DST$SPH_INDEX.sp? 2>&1 `; else # The slave's service is starting cp_result=`rename -f 's/(.*)\/tmp\/(.*)\./$1\/$2\./' $SPH_INDEX_COPY_DST$SPH_INDEX.sp? 2>&1 `; MAKE_ROTATE=0; fi cp_result=`echo -n $cp_result | sed 's/\s\n//g'`; if [ -n "$cp_result" ]; then SPH_INDEX_ERR[$INDICES_ITERATOR]=1; MAKE_ROTATE=0; write_to_log "ERROR: $SPH_INDEX error: $cp_result." 0 else write_to_log "$SPH_INDEX successfuly moved." 1 fi IS_ERROR=${SPH_INDEX_ERR[${INDICES_ITERATOR}]}; if [ -z $IS_ERROR ]; then IS_ERROR=0; # set index to renew SPH_INDEX_NEW[$SPH_INDEX_NEW_ITEM_ITERATOR]="$SPH_INDEX_DIR/$SPH_INDEX"; ((SPH_INDEX_NEW_ITEM_ITERATOR++)); fi if [ "$LOCAL_STAMP" -gt "0" ]; then IDX_TO_ROTATE[${#IDX_TO_ROTATE[*]}]="$SPH_INDEX_DIR/$SPH_INDEX" fi if [ "$SPH_INDEX_DIR" = "$SPH_DIR_ARCHIVE" ]; then SPH_INDEX_REPORTED[$SPH_INDEX_REPORTED_ITERATOR]="$SPH_INDEX_DIR/$SPH_INDEX"; ((SPH_INDEX_REPORTED_ITERATOR++)); fi if [ "$SPH_INDEX_DIR" = "$SPH_DIR_WEEKLY" ] && [ "$SPH_INDEX" = "9" ]; then write_to_log "Index weekly to be reported." 0; SPH_INDEX_WEEKLY_REPORTED=1 fi fi fi done if [ "$FILE_TIMESTAMP_QTY" -lt "1" ]; then write_to_log "Qty of filestamps is $FILE_TIMESTAMP_QTY. Probably the master dir is not accessible." 0 write_to_stderr "Qty of filestamps is 0. Probably the master dir is not accessible." else write_to_log "Qty of filestamps is $FILE_TIMESTAMP_QTY." 0 fi done SPH_INDEX_NEW_QTY=$(( ${#SPH_INDEX_NEW[@]} + 0 )); SPH_INDEX_ERR_QTY=$(( ${#SPH_INDEX_ERR[@]} + 0 )); if [ "$SPH_INDEX_NEW_QTY" -gt "0" ] && [ "$SPH_INDEX_ERR_QTY" -lt "1" ]; then write_to_log "Qty of new indices is larger then 0 and error indices is less then 1 [OK]." 0; then if [ "$MAKE_ROTATE" -gt "0" ]; then write_to_log "Executing the command to rotate indices." 1 command_info=$(/etc/init.d/sphinx rotate 2>&1) retval=$? write_to_log "$command_info [$retval]" 0 IDX_TO_ROTATE_QTY=${#IDX_TO_ROTATE[@]}; # Waiting loop. while [ "$IDX_TO_ROTATE_LOOP_ITR" -lt "$IDX_TO_ROTATE_TTL" ]; do IS_INDEX_AWAITING=0 for ((IDX_TO_ROTATE_ITR=0;IDX_TO_ROTATE_ITR<$IDX_TO_ROTATE_QTY;IDX_TO_ROTATE_ITR++)); do IDX_AWAITING=${IDX_TO_ROTATE[${IDX_TO_ROTATE_ITR}]}; test -z $IDX_AWAITING && continue # INDEX_ERROR_FILE="$LOG_ERROR_DIR$IDX_AWAITING.err" write_to_log "Before checking '$IDX_AWAITING' to be rotated." 2 AWAITING_FILES_QTY=$(ls -1 -U $LOCAL_DIR$IDX_AWAITING"."$IDX_AWAITING_EXT"."* 2>/dev/null | /usr/bin/wc -l) AWAITING_FILES_QTY=$((0 + $AWAITING_FILES_QTY)) if [ "$AWAITING_FILES_QTY" -eq "0" ]; then unset IDX_TO_ROTATE[$IDX_TO_ROTATE_ITR] else let IS_INDEX_AWAITING++; fi if [ "$IDX_TO_ROTATE_LOOP_ITR" -eq "0" ]; then write_to_log "Waiting for the index '$IDX_AWAITING' to be rotated." 1 fi done let IDX_TO_ROTATE_LOOP_ITR++; if [ "$IS_INDEX_AWAITING" -gt "0" ]; then sleep 2 /etc/init.d/sphinx status res=$? if [ "$res" -gt "0" ]; then write_to_log "ERROR. Cannot rotate - daemon is not working" 0 write_to_stderr "ERROR. Cannot rotate - daemon is not working" break; fi else write_to_log "It looks like the indices have been rotated successfully." 0 break; fi if [ "$IDX_TO_ROTATE_LOOP_ITR" -eq "$IDX_TO_ROTATE_TTL" ]; then write_to_log "Reseting iterator to endless loop." 0 write_to_log "Marking the error occured [$INDEX_ERROR_FILE]." 0 # touch $INDEX_ERROR_FILE IDX_TO_ROTATE_LOOP_ITR=0 command_info=$(/etc/init.d/sphinx rotate 2>&1) retval=$? write_to_log "$command_info [$retval]" 0 fi done # wait till rotation is executed else write_to_log "Not allowed to rotate indices" 0; fi # fi else write_to_log "Qty of new indices [$SPH_INDEX_NEW_QTY] is less then 0 OR error indices [$SPH_INDEX_ERR_QTY] is greater then 0."; fi SPH_INDEX_REPORTED_QTY=$(( ${#SPH_INDEX_REPORTED[@]} + 0 )); if [ "$SPH_INDEX_REPORTED_QTY" -gt "0" ]; then write_to_log "Qty of index to report -gt 0 [OK]." 0; for ((IDX_TO_ROTATE_ITR=0;IDX_TO_ROTATE_ITR<$SPH_INDEX_REPORTED_QTY;IDX_TO_ROTATE_ITR++)); do IDX_AWAITING=${SPH_INDEX_REPORTED[${IDX_TO_ROTATE_ITR}]}; VERSION_IDX=$(stat -c '%Y' "$LOCAL_DIR$IDX_AWAITING.sph") SPH_INDEX=`basename $IDX_AWAITING` SPH_INDEX_DIR=`dirname $IDX_AWAITING | xargs basename` PHYSICAL_INDEX=$SPH_INDEX_DIR"_p_"$SPH_INDEX write_to_log "Physical index: $PHYSICAL_INDEX version: $VERSION_IDX" 0 WS_URL=$WS"t=3&version=$VERSION_IDX&physical_index=$PHYSICAL_INDEX&i=$SPH_INDEX_DIR" /usr/bin/wget -q -t 1 $WGET_OPTIONS $WS_URL --output-document="/tmp/master_set_idx_$SPH_INDEX$SPH_INDEX_DIR" 2>/tmp/on_delta_built_err result=$? if [ "$result" -gt "0" ]; then err_info=`cat > "/tmp/on_delta_built_err"` write_to_stderr "Error on $WS_URL. Content: $err_info" fi data_tmp=$(cat "/tmp/master_set_idx_$SPH_INDEX$SPH_INDEX_DIR") write_to_log "$data_tmp" 0 done fi if [ "$SPH_INDEX_WEEKLY_REPORTED" -eq 1 ]; then write_to_log "Qty of index weekly to report -eq 1 [OK]." 0 write_to_log "File: $LOCAL_DIR""weekly9.sph." 0 VERSION_IDX=$(stat -c '%Y' "$LOCAL_DIR$SPH_DIR_WEEKLY/9.sph") write_to_log "Index weekly in version: $VERSION_IDX" 0 WS_URL=$WS"t=COMMAND_ON_DELTA_BUILT&i=weekly&version=$VERSION_IDX" /usr/bin/wget -q -t 1 $WGET_OPTIONS $WS_URL --output-document="/tmp/on_delta_built" 2>/tmp/on_delta_built_err result=$? if [ "$result" -gt "0" ]; then err_info=`cat > "/tmp/on_delta_built_err"` write_to_stderr "Error on $WS_URL. Content: $err_info" fi data_tmp=$(cat "/tmp/on_delta_built") write_to_log "$data_tmp" 0 fi rm $LOCK_FILE; STOP_AT=`date +"%s.%N"` DURATION=` echo "$STOP_AT - $START_AT" | bc ` write_to_log "End of work $DURATION [s]" 0; MEM_INFO=`grep "^MemFree\|^Cached\|SwapCached\|Inactive:\|SwapFree" /proc/meminfo` write_to_log "$MEM_INFO" 0; exit 0
No comments yet.