#!/bin/bash NODE_ID=$ARNOLD_ID NUM_NODES=$ARNOLD_WORKER_NUM NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU # Total number of JSONL files to process TOTAL_JSONL_FILES=16 # 1. Calculate which range of files this node is responsible for. FILES_PER_NODE=$(( (TOTAL_JSONL_FILES + NUM_NODES - 1) / NUM_NODES )) START_JSONL_IDX=$(( NODE_ID * FILES_PER_NODE + 1 )) END_JSONL_IDX=$(( (NODE_ID + 1) * FILES_PER_NODE )) if [ $END_JSONL_IDX -gt $TOTAL_JSONL_FILES ]; then END_JSONL_IDX=$TOTAL_JSONL_FILES fi MODE=${1:-"split"} ENTROPY_QUANTILE=0.90 OUTPUTWINDOW=${2:-16} ITERATIVE_COMPRESS=${3:-"true"} splits_dir=${4:-"ocpython_subsampled_50G_entropy90_splits_line"} FORCE_PADDING=${5:-"true"} SPLIT_CHUNK_SIZE=${6:-"lines"} if [[ $SPLIT_CHUNK_SIZE == "lines" ]]; then SPLIT_ARGS="--chunk_size 128 --apply_line_split --max_entropy_batch_size 2048" else SPLIT_ARGS="--chunk_size $SPLIT_CHUNK_SIZE --max_entropy_batch_size 512" fi if [[ $ITERATIVE_COMPRESS == "false" ]]; then ADDITIONAL_ARG="" elif [[ $ITERATIVE_COMPRESS == "true" ]]; then ADDITIONAL_ARG="--iterative_compress" else echo "Error: Unknown arg '$ITERATIVE_COMPRESS'." echo "Available values: false, true" exit 1 fi if [[ $FORCE_PADDING == "false" ]]; then ADDITIONAL_ARG=${ADDITIONAL_ARG}" " elif [[ $FORCE_PADDING == "true" ]]; then ADDITIONAL_ARG=${ADDITIONAL_ARG}" --force_padding_to_threshold" else echo "Error: Unknown arg '$FORCE_PADDING'." echo "Available values: false, true" exit 1 fi compress_dir=${splits_dir}"_ow${OUTPUTWINDOW}_iterative-${ITERATIVE_COMPRESS}_forcepadding-${FORCE_PADDING}_ac" # Directory and model paths input_dir="opencoder" entropy_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 if [ "$MODE" == "split" ]; then JOBS_PER_GPU=1 elif [ "$MODE" == "compress" ]; then JOBS_PER_GPU=2 else echo "Error: Unknown mode '$MODE'." echo "Available modes: split, compress" exit 1 fi TOTAL_JOBS_PER_FILE=$(( JOBS_PER_GPU * ((NUM_GPUS_PER_NODE + FILES_PER_NODE - 1) / FILES_PER_NODE) )) echo "==================================================" echo "Starting processing on Node ${NODE_ID} of ${NUM_NODES}" echo "Node GPU Count: ${NUM_GPUS_PER_NODE}" echo "Jobs per JSONL file: ${TOTAL_JOBS_PER_FILE}" echo "This node will process files: ${START_JSONL_IDX} to ${END_JSONL_IDX}" echo "==================================================" # Create a directory for log files if it doesn't exist mkdir -p logs GLOBAL_JOB_COUNTER=0 for JSONL_IDX in $(seq $START_JSONL_IDX $END_JSONL_IDX); do echo "--> Processing JSONL file: ${input_dir}/chunk.${JSONL_IDX}.jsonl" for job_index in $(seq 0 $((TOTAL_JOBS_PER_FILE - 1))); do GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) echo " Launching job ${job_index} for file ${JSONL_IDX} on GPU ${GPU_IDX} (Global Job #${GLOBAL_JOB_COUNTER})..." if [ "$MODE" == "split" ]; then CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \ --input_file /mnt/hdfs/user/linzheng/data/${input_dir}/chunk.${JSONL_IDX}.jsonl \ --output_dir /mnt/hdfs/user/linzheng/data/${splits_dir} \ --entropy_model_path $entropy_model_path \ --compression_model_path $compression_model_path \ --data_batch_size 256 \ --num_workers 1 \ --process_id ${job_index} \ --num_processes ${TOTAL_JOBS_PER_FILE} \ --base_global_quantile ${ENTROPY_QUANTILE} \ --base_monotonic_quantile ${ENTROPY_QUANTILE} \ $SPLIT_ARGS > "logs/split_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 & elif [ "$MODE" == "compress" ]; then CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \ --input_file /mnt/hdfs/user/linzheng/data/${splits_dir}/chunk.${JSONL_IDX}.jsonl \ --output_dir /mnt/hdfs/user/linzheng/data/${compress_dir} \ --entropy_model_path $entropy_model_path \ --compression_model_path $compression_model_path \ --firstbyte_prob_path /mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json \ --data_batch_size 256 --max_compression_batch_size 2560 \ --output_window_size ${OUTPUTWINDOW} ${ADDITIONAL_ARG} \ --num_workers 3 --process_id $job_index --num_processes $TOTAL_JOBS_PER_FILE > "logs/compress_node${NODE_ID}_jsonl${JSONL_IDX}_process${job_index}.log" 2>&1 & else echo "Error: Unknown mode '$MODE'." echo "Available modes: split, compress" exit 1 fi # Increment the global counter for the next job GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) done done wait cat logs/compress_node${NODE_ID}_jsonl${START_JSONL_IDX}_process0.log echo "" echo "All jobs on Node ${NODE_ID} have successfully finished." echo "=================================================="