#!/bin/bash # 调整后:强制覆盖数据+200万条数据+4文件4并行 # 核心:不判断直接覆盖,减少数据量,降低并行度为4 # ============================================== # 核心变量(修改为4文件4并行,200万条数据) # ============================================== DATA_NAME=${1:-"ocpython_subsampled_2M"} # 新名称区分实验 ENTROPY_QUANTILE=${2:-0.90} CHUNK_SIZE=${3:-512} OUTPUT_WINDOW=${4:-20} ITERATIVE=${5:-"true"} FORCE_PADDING=${6:-"true"} # 关键调整:文件数和并行任务数均为4 TOTAL_JSONL_FILES=4 # 分割为4个文件 TOTAL_JOBS_PER_FILE=4 # 每个文件4个并行任务 NUM_GPUS_PER_NODE=${ARNOLD_WORKER_GPU:-4} # 保持4GPU NODE_ID=${ARNOLD_ID:-0} # ============================================== # 路径定义(强制覆盖,不保留旧数据) # ============================================== HDFS_INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # 新路径避免冲突 HDFS_SPLITS_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}" HDFS_COMPRESS_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}_ow${OUTPUT_WINDOW}_iterative-${ITERATIVE}_forcepadding-${FORCE_PADDING}_merged_ac" # 本地临时路径 LOCAL_TEMP_DIR="./local_temp_2M" LOCAL_INPUT_DIR="${LOCAL_TEMP_DIR}/raw" # mkdir -p ${LOCAL_INPUT_DIR} # 模型路径(不变) model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 firstbyte_prob_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json # 日志目录(新目录避免混淆) LOG_ROOT="logs_2M_fixed" # rm -rf ${LOG_ROOT} # 强制清理旧日志 # mkdir -p ${LOG_ROOT}/split_stage/node${NODE_ID} # mkdir -p ${LOG_ROOT}/compress_stage/node${NODE_ID} SPLIT_LOG_DIR="${LOG_ROOT}/split_stage/node${NODE_ID}" COMPRESS_LOG_DIR="${LOG_ROOT}/compress_stage/node${NODE_ID}" # ============================================== # 步骤1:数据准备(强制覆盖,抽取200万条) # ============================================== # echo "===== Step 1/3: 准备200万条数据(强制覆盖) =====" # # 强制删除HDFS旧目录(确保覆盖) # rm -rf "${HDFS_INPUT_DIR}" >/dev/null 2>&1 # mkdir -p "${HDFS_INPUT_DIR}" # # 检查原始数据源 # RAW_DATA_SOURCE="/mnt/hdfs/linzheng/data/opencoder_python/opencoder_python.chunk.1.jsonl" # if [ ! -f "${RAW_DATA_SOURCE}" ] || [ ! -r "${RAW_DATA_SOURCE}" ]; then # echo "错误:原始数据源不存在或无法读取!${RAW_DATA_SOURCE}" # exit 1 # fi # # 本地抽取200万条数据(核心调整) # echo "从原始文件抽取200万条数据到本地..." # rm -rf ${LOCAL_TEMP_DIR}/temp.jsonl # 清理旧临时文件 # head -n 2000000 "${RAW_DATA_SOURCE}" > "${LOCAL_TEMP_DIR}/temp.jsonl" # # 检查抽取数据有效性 # if [ ! -s "${LOCAL_TEMP_DIR}/temp.jsonl" ]; then # echo "错误:抽取的200万条数据为空!" # exit 1 # fi # # 本地分割为4个文件(核心调整) # echo "本地分割为${TOTAL_JSONL_FILES}个文件..." # split -n r/${TOTAL_JSONL_FILES} \ # --suffix-length=1 \ # --numeric-suffixes=1 \ # --additional-suffix=.jsonl \ # "${LOCAL_TEMP_DIR}/temp.jsonl" \ # "${LOCAL_INPUT_DIR}/ocp.chunk." # # 检查本地分割文件 # for i in $(seq 1 ${TOTAL_JSONL_FILES}); do # local_file="${LOCAL_INPUT_DIR}/ocp.chunk.${i}.jsonl" # if [ ! -f "${local_file}" ] || [ ! -s "${local_file}" ]; then # echo "错误:本地分割文件无效!${local_file}" # exit 1 # fi # done # # 强制复制到HDFS(覆盖旧数据) # echo "强制复制到HDFS..." # cp -f ${LOCAL_INPUT_DIR}/ocp.chunk.*.jsonl "${HDFS_INPUT_DIR}/" # # 检查HDFS文件 # for i in $(seq 1 ${TOTAL_JSONL_FILES}); do # hdfs_file="${HDFS_INPUT_DIR}/ocp.chunk.${i}.jsonl" # if [ ! -f "${hdfs_file}" ] || [ ! -s "${hdfs_file}" ]; then # echo "错误:HDFS文件无效!${hdfs_file}" # exit 1 # fi # done echo "200万条数据准备完成,HDFS路径: ${HDFS_INPUT_DIR}" # ============================================== # 步骤2:数据分割(4并行任务) # ============================================== # echo -e "\n===== Step 2/3: 窗口分割(4并行) =====" # # 强制清理旧分割结果 # rm -rf "${HDFS_SPLITS_DIR}" >/dev/null 2>&1 # mkdir -p "${HDFS_SPLITS_DIR}" # 计算任务范围(4文件×4任务=16总任务) # JOBS_PER_NODE=$(( NUM_GPUS_PER_NODE * 1 )) # 4GPU各跑1任务 # TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE )) # START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) # END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) # [ $END_JOB_IDX -ge $TOTAL_JOBS ] && END_JOB_IDX=$(( TOTAL_JOBS - 1 )) # echo "分割阶段节点${NODE_ID}:处理全局任务${START_JOB_IDX}~${END_JOB_IDX}" # echo "分割结果输出到:${HDFS_SPLITS_DIR}" # 启动分割任务 # GLOBAL_JOB_COUNTER=0 # for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do # JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) # 1-4 # job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) # 0-3 # GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) # input_file="${HDFS_INPUT_DIR}/ocp.chunk.${JSONL_IDX}.jsonl" # echo "启动分割任务:文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})" # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \ # --input_file "${input_file}" \ # --output_dir "${HDFS_SPLITS_DIR}" \ # --entropy_model_path "${model_path}" \ # --compression_model_path "${model_path}" \ # --data_batch_size 256 \ # --max_entropy_batch_size 256 \ # --num_workers 1 \ # --process_id ${job_index} \ # --num_processes ${TOTAL_JOBS_PER_FILE} \ # --base_global_quantile ${ENTROPY_QUANTILE} \ # --base_monotonic_quantile ${ENTROPY_QUANTILE} \ # --chunk_size ${CHUNK_SIZE} > "${SPLIT_LOG_DIR}/split_file${JSONL_IDX}_task${job_index}.log" 2>&1 & # GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) # done # 等待并检查分割结果 # wait # echo "分割阶段节点${NODE_ID}任务结束,检查输出文件..." # for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do # JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) # job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) # split_output_file="${HDFS_SPLITS_DIR}/ocp.chunk.${JSONL_IDX}_out_${job_index}.jsonl" # if [ ! -f "${split_output_file}" ] || [ ! -s "${split_output_file}" ]; then # echo "错误:分割输出文件无效!${split_output_file}" # exit 1 # fi # done # echo "✅ 分割阶段节点${NODE_ID}成功完成" # ============================================== # 步骤3:数据压缩(4并行任务) # ============================================== echo -e "\n===== Step 3/3: 数据压缩(4并行) =====" # 强制清理旧压缩结果 rm -rf "${HDFS_COMPRESS_DIR}" >/dev/null 2>&1 mkdir -p "${HDFS_COMPRESS_DIR}" # 计算压缩任务范围(与分割任务匹配) JOBS_PER_NODE=$(( NUM_GPUS_PER_NODE * 1 )) # 4GPU各跑1任务(避免索引越界) TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE )) START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) [ $END_JOB_IDX -ge $TOTAL_JOBS ] && END_JOB_IDX=$(( TOTAL_JOBS - 1 )) echo "压缩阶段节点${NODE_ID}:处理全局任务${START_JOB_IDX}~${END_JOB_IDX}" echo "压缩结果输出到:${HDFS_COMPRESS_DIR}" # 压缩参数 ADDITIONAL_ARGS="" [ "${ITERATIVE}" == "true" ] && ADDITIONAL_ARGS="--iterative_compress" [ "${FORCE_PADDING}" == "true" ] && ADDITIONAL_ARGS="${ADDITIONAL_ARGS} --force_padding_to_threshold" # 启动压缩任务 GLOBAL_JOB_COUNTER=0 for global_job_idx in $(seq ${START_JOB_IDX} ${END_JOB_IDX}); do JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) input_file="${HDFS_SPLITS_DIR}/ocp.chunk.${JSONL_IDX}_out_${job_index}.jsonl" echo "启动压缩任务:文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})" CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \ --input_file "${input_file}" \ --output_dir "${HDFS_COMPRESS_DIR}" \ --entropy_model_path "${model_path}" \ --compression_model_path "${model_path}" \ --firstbyte_prob_path "${firstbyte_prob_path}" \ --data_batch_size 512 \ --max_compression_batch_size 256 \ --output_window_size ${OUTPUT_WINDOW} \ --num_workers 3 \ --process_id ${job_index} \ --num_processes ${TOTAL_JOBS_PER_FILE} \ --debug \ ${ADDITIONAL_ARGS} > "${COMPRESS_LOG_DIR}/compress_file${JSONL_IDX}_task${job_index}.log" 2>&1 & GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) done # 等待并检查压缩结果 wait echo "压缩阶段节点${NODE_ID}任务结束,检查日志..." if grep -q -E 'Error|Traceback|failed' ${COMPRESS_LOG_DIR}/*.log; then echo "❌ 压缩阶段节点${NODE_ID}出现错误!" exit 1 else echo "✅ 压缩阶段节点${NODE_ID}成功完成" fi # ============================================== # 步骤4:主节点合并结果 # ============================================== if [ ${NODE_ID} -eq 0 ]; then echo -e "\n===== 合并所有压缩结果 =====" merged_file="${HDFS_COMPRESS_DIR}/merged_final.jsonl" find "${HDFS_COMPRESS_DIR}" -name "ocp.chunk.*.jsonl" -exec cat {} \; > "${merged_file}" echo "✅ 所有结果已合并到:${merged_file}" fi # 清理本地临时文件 rm -rf ${LOCAL_TEMP_DIR} echo -e "\n===== 全流程完成 =====" # #!/bin/bash # DATA_NAME=${1:-"ocpython_subsampled_7G"} # data name # # for split # ENTROPY_QUANTILE=${2:-0.90} # entropy9 # CHUNK_SIZE=${3:-512} # chunk512 # INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # checkout data path # # data preparing - split to 8 cuts # if [[ ! -d "$INPUT_DIR" || -z "$(ls -A $INPUT_DIR)" ]]; then # echo "Preparing 7G dataset..." # # acquire about 7g data # head -n 3675000 /mnt/hdfs/linzheng/data/opencoder_python/opencoder_python.chunk.1.jsonl > temp.jsonl # split -n r/$TOTAL_JSONLS --suffix-length=1 --numeric-suffixes=1 --additional-suffix=.jsonl temp.jsonl ${INPUT_DIR}/ocp.chunk. # rm temp.jsonl # echo "7G data preparation completed." # else # echo "Directory '$INPUT_DIR' already exists, using existing data." # fi # # 输出目录命名:包含数据名、熵分位数、chunk大小 # SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}" # OUTPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}" # # NUM_GPUS=4 # same gpu # # TOTAL_JOBS=8 # more task # # TOTAL_JSONLS=8 # split for 8 jsonl # # # set dir # # mkdir -p $INPUT_DIR # # mkdir -p $OUTPUT_DIR # # mkdir -p $LOG_DIR # # # model path # # entropy_model_path=/mnt/hdfs/checkpoints/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_python/checkpoints/0000200000 # # compression_model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_python/checkpoints/0000200000 # # # Step 1:offline_entropy_window_split.py # # echo "Starting window splitting for 7G data..." # # for JSONL_IDX in $(seq 1 $TOTAL_JSONLS); do # # for index in $(seq 0 $((TOTAL_JOBS - 1))); do # # echo "Starting split job $index for chunk $JSONL_IDX..." # # GPU_IDX=$(( (JSONL_IDX + index) % NUM_GPUS )) # # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \ # # --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}.jsonl" \ # # --output_dir "${OUTPUT_DIR}" \ # # --entropy_model_path $entropy_model_path \ # # --compression_model_path $compression_model_path \ # # --data_batch_size 256 \ # # --max_entropy_batch_size 256 \ # # --num_workers 2 \ # # --process_id $index \ # # --num_processes $TOTAL_JOBS \ # # --base_global_quantile ${ENTROPY_QUANTILE} \ # # --base_monotonic_quantile ${ENTROPY_QUANTILE} \ # # --chunk_size ${CHUNK_SIZE} > "${LOG_DIR}/split_file${JSONL_IDX}_task${index}.log" 2>&1 & # # # 每启动等于GPU数量的任务就等待一次,避免资源竞争 # # if (( (index + 1) % NUM_GPUS == 0 )); then # # wait # # fi # # done # # done # # # wait for split task # # wait # # echo "Window splitting for 7G data completed." # DATA_NAME=${1:-"ocpython_subsampled_7G"} # ENTROPY_QUANTILE=${2:-0.90} # 熵分位数(对应命名中的entropy90) # CHUNK_SIZE=${3:-512} # 分割块大小(对应命名中的chunk512) # INPUT_DIR="/mnt/hdfs/linzheng/data/${DATA_NAME}" # 原始数据路径 # # 输出目录命名:包含数据名、熵分位数、chunk大小 # SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}" # OUTPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}" # # 集群环境参数(适配ARNOLD调度) # NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU # NODE_ID=$ARNOLD_ID # JOBS_PER_GPU=1 # 分割阶段每个GPU跑1个任务 # TOTAL_JOBS_PER_FILE=8 # 每个文件的并行任务数 # TOTAL_JSONL_FILES=8 # 总文件数(根据数据量调整) # # 计算当前节点任务范围 # JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE )) # TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE )) # START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) # END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) # if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then # END_JOB_IDX=$(( TOTAL_JOBS - 1 )) # fi # mkdir -p logs/split_stage/node${NODE_ID} # LOG_DIR="logs/split_stage/node${NODE_ID}" # echo "==================================================" # echo "分割阶段 - 节点${NODE_ID}" # echo "数据名称: ${DATA_NAME}, 熵分位数: ${ENTROPY_QUANTILE}" # echo "Chunk大小: ${CHUNK_SIZE}, 输出目录: ${SPLITS_DIR}" # echo "处理任务范围: 全局任务${START_JOB_IDX}~${END_JOB_IDX}" # echo "==================================================" # model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 # # 启动分割任务 # GLOBAL_JOB_COUNTER=0 # for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do # # 计算文件索引和任务索引 # JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) # job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) # GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) # 轮询分配GPU # echo "启动分割任务: 文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})" # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_split.py \ # --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}.jsonl" \ # --output_dir "${OUTPUT_DIR}" \ # --entropy_model_path "${model_path}" \ # --compression_model_path "${model_path}" \ # --data_batch_size 256 \ # --max_entropy_batch_size 256 \ # --num_workers 1 \ # --process_id ${job_index} \ # --num_processes ${TOTAL_JOBS_PER_FILE} \ # --base_global_quantile ${ENTROPY_QUANTILE} \ # --base_monotonic_quantile ${ENTROPY_QUANTILE} \ # --chunk_size ${CHUNK_SIZE} > "${LOG_DIR}/split_file${JSONL_IDX}_task${job_index}.log" 2>&1 & # GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) # done # wait # echo "分割阶段节点${NODE_ID}任务结束,检查日志..." # if grep -q -E 'Error|Traceback|failed' ${LOG_DIR}/*.log; then # echo "❌ 分割阶段节点${NODE_ID}出现错误!" # exit 1 # else # echo "✅ 分割阶段节点${NODE_ID}成功完成" # fi # ## stage2: compress data # DATA_NAME=${1:-"ocpython_subsampled_50G"} # ENTROPY_QUANTILE=${2:-0.90} # CHUNK_SIZE=${3:-512} # OUTPUT_WINDOW=${4:-20} # ITERATIVE=${5:-"true"} # FORCE_PADDING=${6:-"true"} # SPLITS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}" # INPUT_DIR="/mnt/hdfs/linzheng/data/${SPLITS_DIR}" # COMPRESS_DIR="${DATA_NAME}_entropy${ENTROPY_QUANTILE//./}_splits_chunk${CHUNK_SIZE}_ow${OUTPUT_WINDOW}_iterative-${ITERATIVE}_forcepadding-${FORCE_PADDING}_merged_ac" # OUTPUT_DIR="/mnt/hdfs/linzheng/data/${COMPRESS_DIR}" # # 集群环境参数 # NUM_GPUS_PER_NODE=$ARNOLD_WORKER_GPU # NODE_ID=$ARNOLD_ID # JOBS_PER_GPU=2 # 压缩阶段每个GPU跑2个任务(提高利用率) # TOTAL_JOBS_PER_FILE=8 # 与分割阶段保持一致 # TOTAL_JSONL_FILES=8 # # 计算当前节点任务范围 # JOBS_PER_NODE=$(( JOBS_PER_GPU * NUM_GPUS_PER_NODE )) # TOTAL_JOBS=$(( TOTAL_JSONL_FILES * TOTAL_JOBS_PER_FILE )) # START_JOB_IDX=$(( NODE_ID * JOBS_PER_NODE )) # END_JOB_IDX=$(( START_JOB_IDX + JOBS_PER_NODE - 1 )) # if [ $END_JOB_IDX -ge $TOTAL_JOBS ]; then # END_JOB_IDX=$(( TOTAL_JOBS - 1 )) # fi # mkdir -p logs/compress_stage/node${NODE_ID} # LOG_DIR="logs/compress_stage/node${NODE_ID}" # echo "==================================================" # echo "压缩阶段 - 节点${NODE_ID}" # echo "输入分割目录: ${SPLITS_DIR}" # echo "输出压缩目录: ${COMPRESS_DIR}" # echo "窗口大小: ${OUTPUT_WINDOW}, 迭代压缩: ${ITERATIVE}, 强制填充: ${FORCE_PADDING}" # echo "处理任务范围: 全局任务${START_JOB_IDX}~${END_JOB_IDX}" # echo "==================================================" # model_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/m1_checkpoints/m1_40M_lr1e-3_steps200k_bs8_seqlen2048_full/checkpoints/0000200000 # firstbyte_prob_path=/mnt/bn/tiktok-mm-5/aiic/users/linzheng/artifacts/ac_unigram_probs/opencoder13G_unigram_prob_smooth0.1.json # # 压缩参数拼接(迭代压缩和强制填充) # ADDITIONAL_ARGS="" # if [ "$ITERATIVE" == "true" ]; then # ADDITIONAL_ARGS="--iterative_compress" # fi # if [ "$FORCE_PADDING" == "true" ]; then # ADDITIONAL_ARGS="${ADDITIONAL_ARGS} --force_padding_to_threshold" # fi # # 启动压缩任务 # GLOBAL_JOB_COUNTER=0 # for global_job_idx in $(seq $START_JOB_IDX $END_JOB_IDX); do # JSONL_IDX=$(( (global_job_idx / TOTAL_JOBS_PER_FILE) + 1 )) # job_index=$(( global_job_idx % TOTAL_JOBS_PER_FILE )) # GPU_IDX=$(( GLOBAL_JOB_COUNTER % NUM_GPUS_PER_NODE )) # echo "启动压缩任务: 文件${JSONL_IDX},任务${job_index}(GPU${GPU_IDX})" # CUDA_VISIBLE_DEVICES=${GPU_IDX} python3 offline_entropy_window_compress_ac.py \ # --input_file "${INPUT_DIR}/chunk.${JSONL_IDX}_out_${job_index}.jsonl" \ # 对应分割阶段的输出 # --output_dir "${OUTPUT_DIR}" \ # --entropy_model_path "${model_path}" \ # --compression_model_path "${model_path}" \ # --firstbyte_prob_path "${firstbyte_prob_path}" \ # --data_batch_size 512 \ # --max_compression_batch_size 256 \ # --output_window_size ${OUTPUT_WINDOW} \ # --num_workers 3 \ # --process_id ${job_index} \ # --num_processes ${TOTAL_JOBS_PER_FILE} \ # ${ADDITIONAL_ARGS} > "${LOG_DIR}/compress_file${JSONL_IDX}_task${job_index}.log" 2>&1 & # GLOBAL_JOB_COUNTER=$(( GLOBAL_JOB_COUNTER + 1 )) # done # # 等待任务完成并检查错误 # wait # echo "压缩阶段节点${NODE_ID}任务结束,检查日志..." # if grep -q -E 'Error|Traceback|failed' ${LOG_DIR}/*.log; then # echo "❌ 压缩阶段节点${NODE_ID}出现错误!" # exit 1 # else # echo "✅ 压缩阶段节点${NODE_ID}成功完成" # fi # # 主节点负责合并结果(仅让节点0执行合并) # if [ $NODE_ID -eq 0 ]; then # echo "开始合并所有压缩结果到${OUTPUT_DIR}..." # # 合并逻辑(根据你的merge_output函数实现,这里简化为示例) # find "${OUTPUT_DIR}" -name "*.jsonl" -exec cat {} \; > "${OUTPUT_DIR}/merged_final.jsonl" # echo "✅ 所有结果已合并到${OUTPUT_DIR}/merged_final.jsonl" # fi