#!/bin/bash
set -e

if [[ $# -eq 0 ]] ; then
    echo "Usage: blog_HBase_graceful_decommission.sh <function> <parameter 1> <parameter 2> <parameter 3>."
    echo "Valid input values for 'function' are: move_regions, terminate_ec2, or stop_and_check_task_rs. "
    echo "Here are some examples of each function and corresponding input parameters. The specified your-secret-id is a secret that saves your SSH credentials in AWS Secrets Manager. "
    echo "For example 1, blog_HBase_graceful_decommission.sh move_regions your-secret-id ip-172-0-0-1.us-west-2.compute.internal s3://yourbucket/yourpath/"
    echo "For example 2, blog_HBase_graceful_decommission.sh terminate_ec2 your-secret-id ig-ABCDEFGH12345 i-1234567890abcdef"
    echo "For example 3, blog_HBase_graceful_decommission.sh stop_and_check_task_rs your-secret-id ig-ABCDEFGH12345 s3://yourbucket/yourpath/"
    exit 1
fi

trap "exit 1" TERM
export TOP_PID=$$

function kill_func()
{
   echo "Exit the process..."
   kill -s TERM $TOP_PID
}

inputFunction=$1
keyName=$2

#### Functions for gracefully decommission of HBase region server node #### 

move_regions()
{
    keyname=$1
    targetRS=$2
    S3Path=$3

    ### Add trailing slash if needed
    [[ "${S3Path}" != */ ]] && S3Path="${S3Path}/"

    # Generate move_hregions.sh
    cat << EOF > move_hregions.sh
#!/bin/bash
set -x

if [[ \$# -eq 0 ]] ; then
    echo "Usage: move_hregions.sh <HBase_Region_Server_private_DNS>"
    exit 1
fi

targetRS=\$1
cnt=1

echo "balance_switch false" | sudo -u hbase hbase shell
## To make sure balance_switch is disabled, we submitted the same command again. The output should say it’s already in “false” status.  
echo "balance_switch false" | sudo -u hbase hbase shell

# try to move regions with 1st attempt
echo "Attempt \$cnt"
command_output=\$(timeout -k 5 300 sudo -u hbase sh /usr/lib/hbase/bin/graceful_stop.sh --maxthreads 10 \$targetRS)
command_exit_code=\$?

# start a loop when the previous process exited with timeout error 137.
while [[ "\$cnt" -le 5 ]]; do
    if [ "\$command_exit_code" -eq 137 ]; then
        echo "The previous process got an error code of 137 because of timeout."
        cnt=\`expr \$cnt + 1\`
        echo "Retrying through attempt \$cnt"
        command_output=\$(timeout -k 5 300 sudo -u hbase sh /usr/lib/hbase/bin/graceful_stop.sh --maxthreads 10 \$targetRS)
        command_exit_code=\$?
    elif [[ "\$command_exit_code" -eq 0 ]]; then
        echo "Completed moving regions to other region servers. "
        break
    else
        echo "Moving regions encounted unexpected error. "
        echo "Process exit code: \${command_exit_code}"
        exit 1
    fi
done

if [[ "\$cnt" -gt 5 && "\$command_exit_code" -eq 137 ]]; then
    echo "Trying to move the regions on \$targetRS but exceeding max retry 6 times. "
    exit 1
fi

EOF

    # Upload move_hregions.sh to S3
    aws s3 cp move_hregions.sh $S3Path
    S3_move_hregions=`echo $S3Path"move_hregions.sh"`
    # Run script to evacuate HBase regions from the target region server
    # set up ssh key from AWS Secrets Manager
    sshkey=`aws secretsmanager get-secret-value --secret-id $keyname | jq -r ".SecretString"`
    eval `ssh-agent -s` > eval_pid 2>&1  
    ssh-add - <<< "$sshkey"
    ssh -t -o "StrictHostKeyChecking no" hadoop@"$targetRS" "aws s3 cp $S3_move_hregions .; sh -x move_hregions.sh $targetRS > /home/hadoop/move_hregions.log 2>&1; cat /home/hadoop/move_hregions.log"
    kill -9 `awk '{print $3}' eval_pid`
}

stop_RS_IC()
{
    keyname=$1
    targetRS=$2
    S3Path=$3

    ### Add trailing slash if needed
    [[ "${S3Path}" != */ ]] && S3Path="${S3Path}/"

    cat << EOF > stop_RS_IC.sh
#!/bin/bash
set -x

if [[ \$# -eq 0 ]] ; then
   echo "Usage: stop_RS_IC.sh <Decommission_target_private_DNS>"
   exit 1
fi

targetRS=\$1

echo "shutdown region server on \${targetRS}."
sudo systemctl status hbase-regionserver.service | tee -a /tmp/graceful_stop.log;
sudo systemctl stop hbase-regionserver.service | tee -a /tmp/graceful_stop.log;
sudo systemctl status hbase-regionserver.service | tee -a /tmp/graceful_stop.log;

masterHostname=\`cat /emr/instance-controller/lib/info/extraInstanceData.json | jq -r '.masterHost'\`
runningContainers=\`curl -L \$masterHostname:8088/ws/v1/cluster/nodes/\$targetRS:8041/ | jq -r '.[].numContainers '\`

## Check YARN Resource Manager through REST API for any container running on the target host.

# Will check running containers 10 times with 5 seconds interval. 
# Will break the loop when there's no running container or 10 times of attempts are met.  
cnt=1

while [[ "\$cnt" -le 10 && "\$runningContainers" -ne 0 ]]
do
    echo "Check running container amount \${cnt} out of 10 times. " | tee -a /tmp/graceful_stop.log;
    echo "Still have \${runningContainers} containers running on \${targetRS}" | tee -a /tmp/graceful_stop.log
    echo "Not to shutdown IC" | tee -a /tmp/graceful_stop.log
    sleep 5
    runningContainers=\`curl -L \$masterHostname:8088/ws/v1/cluster/nodes/\$targetRS:8041/ | jq -r '.[].numContainers '\`
    cnt=`expr $cnt + 1`
done

if [ "\$runningContainers" -eq 0 ]; then
        echo "\${runningContainers} container is running on \${targetRS}" | tee -a /tmp/graceful_stop.log
        echo "Shutdown IC" | tee -a /tmp/graceful_stop.log
        sudo systemctl status instance-controller.service | tee -a /tmp/graceful_stop.log
        sudo systemctl stop instance-controller.service | tee -a /tmp/graceful_stop.log
        sudo systemctl status instance-controller.service | tee -a /tmp/graceful_stop.log
else
        echo "\${cnt} attempt exceeded 10. " | tee -a /tmp/graceful_stop.log;
        echo "Still have \${runningContainers} containers running on \${targetRS}" | tee -a /tmp/graceful_stop.log
        echo "Not to shutdown IC" | tee -a /tmp/graceful_stop.log
fi
EOF

    # Upload stop_RS_IC.sh to S3
    aws s3 cp stop_RS_IC.sh $S3Path
    S3_stop_RS_IC=`echo $S3Path"stop_RS_IC.sh"`
    # Run script to stop IC on the target region server
    sshkey=`aws secretsmanager get-secret-value --secret-id $keyname | jq -r ".SecretString"`
    eval `ssh-agent -s` > eval_pid 2>&1
    ssh-add - <<< "$sshkey"
    ssh -t -o "StrictHostKeyChecking no" hadoop@"$targetRS" "aws s3 cp $S3_stop_RS_IC .; sh -x stop_RS_IC.sh $targetRS > /home/hadoop/stop_RS_IC.log 2>&1; cat /home/hadoop/stop_RS_IC.log"
    kill -9 `awk '{print $3}' eval_pid`
}

terminate_ec2()
{
    jobFlowID=($(grep jobFlowId /emr/instance-controller/lib/info/extraInstanceData.json | awk -F":" '{print $2}' | sed 's/\"//g' | sed 's/,//g'))
    igID=$1
    instanceID=$2
    
    ## Get HDFS replication level for every file
    replCheck=($(hdfs fsck / -files -blocks -locations -replicaDetails | grep -E "Live_repl=1|replication=1" | wc -l))
    
    if [[ $replCheck -gt 0 ]]
    then
       echo "Some HDFS blocks have only 1 copy. Please increase HDFS replication factor. Exit..."
       echo $(kill_func)
    else
        beforeDate=($(date '+%Y-%m-%dT%H-%M-%S'))
        hdfs dfsadmin -report | grep -A 7 -i "Replicated Blocks" > /home/hadoop/rep_blocks_${beforeDate}
    
        echo "terminating ${instanceID}"

        command_output1=$(aws emr list-instances --cluster-id $jobFlowID --instance-group-id $igID | jq '.Instances[] | select(.Ec2InstanceId=='\"$instanceID\"')' | jq '.PrivateDnsName' | sed 's/\"//g' | sed 's/,//g')
        command_exit_code1=$?

        command_output2=$(aws emr modify-instance-groups --cluster-id $jobFlowID --instance-groups InstanceGroupId="${igID}",EC2InstanceIdsToTerminate="${instanceID}"  > /tmp/terminate_ec2.log 2>&1)
        command_exit_code2=$?

        if [[ "$command_exit_code1" -ne 0 || "$command_exit_code2" -ne 0 ]]; then
            echo "There are errors when executing list-instances or modify-instance-groups. Exit..."
            echo $(kill_func)
        else
            # Wait 90 sec for EMR to terminate the target EC2 instance. 
            sleep 90
    
            while true; 
            do 
                echo $(date '+%Y-%m-%dT%H-%M-%S')
           
                decommissionState=($(hdfs dfsadmin -report | grep "Decommission Status" | grep -i "in progress" | wc -l))
           
                if [[ $decommissionState -gt 0 ]]
                then
                    hdfs dfsadmin -report | grep -B 1 "Decommission Status : Decommission in progress"
                    sleep 30
                else
                    echo "Decommission completed. "
                    hdfs dfsadmin -report
                    break
                fi
            done
        
            afterDate=($(date '+%Y-%m-%dT%H-%M-%S'))
            hdfs dfsadmin -report | grep -A 7 -i "Replicated Blocks" > /home/hadoop/rep_blocks_${afterDate}
        
            diff /home/hadoop/rep_blocks_*
            rm /home/hadoop/rep_blocks_*
        fi
    fi
}

stop_RS()
{
    cluster_id=`jq -r ".jobFlowId" /mnt/var/lib/info/job-flow.json`
    keyname=$1
    igid=$2
    S3Path=$3
    notTask=`aws emr describe-cluster --cluster-id $cluster_id --query "Cluster.InstanceGroups[?Id=='${igid}'].InstanceGroupType" | jq '@csv' | grep -iv task | wc -l`

    if [[ $notTask -ne 0 ]] ; then
       echo "The specified instance group is not a task group. Exit..."
       echo $(kill_func)
    fi
    
    aws emr list-instances --cluster-id $cluster_id --instance-group-id $igid --instance-states RUNNING|grep -Po '"PrivateDnsName": *\"[^"]*"'|sed 's/"//g' | awk -F":" '{print $2}' | sed 's/ //g'> private_host
    for host in $(cat private_host); do 
       echo "##### moving regions from ${host} to core group #####"
       move_regions $keyname $host $S3Path 2>&1 | tee -a /tmp/master_move_region_for_task.log
       cat /tmp/master_move_region_for_task.log
       error_cnt=`grep -E "Usage:|exceeding max retry|encounted unexpected error" /tmp/master_move_region_for_task.log | wc -l`
       if [[ "$error_cnt" -gt 0 ]]; then
          echo "Some errors happened when moving regions. Please manually move the remaining regions through hbase shell. E.g. move 'ENCODED_REGIONNAME', 'SERVER_NAME'"
          echo "Or, submit the move region step again. "
          echo "Exit..."
          echo $(kill_func)
       fi
       echo "Stop region servers after evacuating all regions in the specified task group. "
       sshkey=`aws secretsmanager get-secret-value --secret-id $keyname | jq -r ".SecretString"`
       eval `ssh-agent -s` > eval_pid 2>&1
       ssh-add - <<< "$sshkey"
       ssh -t -o "StrictHostKeyChecking no" hadoop@"$host" "sudo systemctl stop hbase-regionserver.service > /home/hadoop/stop_RS.log 2>&1; sudo systemctl status hbase-regionserver.service >> /home/hadoop/stop_RS.log 2>&1; cat /home/hadoop/stop_RS.log"
       kill -9 `awk '{print $3}' eval_pid`
    done
}

modify_task()
{
    cluster_id=`jq -r ".jobFlowId" /mnt/var/lib/info/job-flow.json`
    igid=$1
    cat << EOF > instanceGroup_$igid.json
[
   {
      "InstanceGroupId":"$igid",
      "Configurations":[
        {
          "Classification": "hbase-site",
          "Properties": {
            "hbase.rootdir": "hdfs://non/existing/location"
          }
        },
        {
          "Classification": "hbase",
          "Properties": {
            "hbase.emr.storageMode": "hdfs"
          }
        }
      ]
   }
]
EOF
    # Reconfigure the task group and point hbase root dir to a nowhere. 
    aws emr modify-instance-groups --cluster-id $cluster_id --instance-groups file://instanceGroup_$igid.json

    ## Ensure specified task group is in RUNNING state. 
    cnt=1

    # Resize the task group to 0 node once it completes the reconfiguration. 
    while [[ $cnt -le 12 ]]
    do
        echo "Attempt ${cnt}. "
        sleep 300
        notRunningCnt=`aws emr describe-cluster --cluster-id $cluster_id --query "Cluster.InstanceGroups[?Id=='${igid}'].Status.State" | jq '@csv' | grep -v "RUNNING" | wc -l`
        if [[ $notRunningCnt -eq 0 ]] ; then
            echo "${igid} is in RUNNING state. Proceed to scale in this instance group to 0. "
            aws emr modify-instance-groups --cluster-id $cluster_id --instance-groups InstanceGroupId=$igid,InstanceCount=0
            break        
        else
            echo "${igid} is not in RUNNING state yet. Sleep 300 sec and will retry. "
            cnt=`expr $cnt + 1`
        fi
    done

    if [[ $notRunningCnt -ne 0 ]] ; then
        echo "${igid} is still not in RUNNING state and already exceeds 12 times of retries. "
        echo "Please manually scale in the task group to 0 node once it's in RUNNING state. Exit... "
        echo $(kill_func)
    fi
}

#### End of functions ####

## Main function ##
if [[ "$inputFunction" == "move_regions" ]]; then
    echo "Action: ${inputFunction}"
    targetRS=$3
    S3Path=$4
    move_regions $keyName $targetRS $S3Path 2>&1 | tee /tmp/master_move_region.log
    cat /tmp/master_move_region.log
    error_cnt1=`grep -E "Usage:|exceeding max retry|encounted unexpected error" /tmp/master_move_region.log | wc -l`
    if [[ "$error_cnt1" -gt 0 ]]; then
        echo "Some errors happened when moving regions. Please manually move the remaining regions through hbase shell. E.g. move 'ENCODED_REGIONNAME', 'SERVER_NAME'"
        echo "Or, submit the move region step again. "
        echo $(kill_func)
    else
        stop_RS_IC $keyName $targetRS $S3Path 2>&1 | tee /tmp/master_stop_RS_IC.log
        cat /tmp/master_stop_RS_IC.log
        error_cnt2=`grep -E "Usage:" /tmp/master_stop_RS_IC.log | wc -l`
        if [[ "$error_cnt2" -gt 0 ]]; then
            echo "Some errors happened when stopping RS and IC. Please valid arguments to the script. "
            echo $(kill_func)
        fi
    fi
elif [[ "$inputFunction" == "terminate_ec2" ]]; then
    igID=$3
    instanceID=$4
    terminate_ec2 $igID $instanceID 2>&1 | tee /tmp/terminate_ec2.log
    cat /tmp/terminate_ec2.log
elif [[ "$inputFunction" == "stop_and_check_task_rs" ]]; then
    igid=$3
    S3Path=$4
    stop_RS $keyName $igid $S3Path 2>&1 | tee /tmp/stop_RS.log
    cat /tmp/stop_RS.log
    modify_task $igid 2>&1 | tee /tmp/modify_task.log
    cat /tmp/modify_task.log
else
    echo "Please provide a valid function value. E.g. move_regions, terminate_ec2, or stop_and_check_task_rs. "  
fi;


