VisionTSpp / app.py
Lefei's picture
update app.py, add choice button for VisionTSpp base and large
99f09ff verified
# app.py
import os
import gradio as gr
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import einops
import copy
import uuid
import shutil
import time
import threading
from pathlib import Path
from huggingface_hub import snapshot_download
# Assuming visionts package is available
from visionts import VisionTSpp, freq_to_seasonality_list
# ========================
# 0. Environment & Cleanup Configuration
# ========================
# --- Configuration for Session Cleanup ---
SESSION_DIR_ROOT = Path("user_sessions")
SESSION_DIR_ROOT.mkdir(exist_ok=True)
MAX_FILE_AGE_SECONDS = 24 * 60 * 60 # 24 hours
CLEANUP_INTERVAL_SECONDS = 60 * 60 # Run cleanup check every 1 hour
# set the gradio tmp dir
# os.environ["GRADIO_TEMP_DIR"] = "./user_sessions"
def cleanup_old_sessions():
"""Deletes session folders older than MAX_FILE_AGE_SECONDS."""
print(f"Running periodic cleanup of old session directories, with periodicity of {CLEANUP_INTERVAL_SECONDS} seconds...")
now = time.time()
deleted_count = 0
for session_dir in SESSION_DIR_ROOT.iterdir():
if session_dir.is_dir():
try:
# Use modification time of the directory as an indicator of last activity
dir_mod_time = session_dir.stat().st_mtime
if (now - dir_mod_time) > MAX_FILE_AGE_SECONDS:
print(f"Cleaning up old session directory: {session_dir}, over {MAX_FILE_AGE_SECONDS} seconds.")
shutil.rmtree(session_dir)
deleted_count += 1
except Exception as e:
print(f"Error cleaning up directory {session_dir}: {e}")
if deleted_count > 0:
print(f"Cleanup complete. Removed {deleted_count} old session(s).")
else:
print("Cleanup complete. No old sessions found.")
# --- Function to run the cleanup periodically in the background ---
def periodic_cleanup_task():
"""Wrapper function to run cleanup in a loop with a sleep interval."""
print("Starting background thread for periodic cleanup.")
while True:
cleanup_old_sessions()
time.sleep(CLEANUP_INTERVAL_SECONDS)
# ========================
# 1. Model Configuration
# ========================
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
REPO_ID = "Lefei/VisionTSpp"
LOCAL_DIR = "./hf_models/VisionTSpp"
# --- Define model configurations ---
MODEL_CONFIGS = {
"base": {
"arch": 'mae_base',
"ckpt_path": os.path.join(LOCAL_DIR, "visiontspp_base.ckpt")
},
"large": {
"arch": 'mae_large',
"ckpt_path": os.path.join(LOCAL_DIR, "visiontspp_large.ckpt")
}
}
# Download both checkpoints if they don't exist
for model_size, config in MODEL_CONFIGS.items():
if not os.path.exists(config["ckpt_path"]):
print(f"Downloading {model_size} model from Hugging Face Hub...")
snapshot_download(
repo_id=REPO_ID,
local_dir=LOCAL_DIR,
local_dir_use_symlinks=False,
resume_download=True,
allow_patterns=[f"*{model_size}*"] # Download only relevant files if possible
)
QUANTILES = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
# Image normalization constants
imagenet_mean = np.array([0.485, 0.456, 0.406])
imagenet_std = np.array([0.229, 0.224, 0.225])
# --- Global variables to hold the currently loaded model and its size ---
CURRENT_MODEL_SIZE = None
CURRENT_MODEL = None
def load_model_for_size(model_size: str):
"""Loads the specified VisionTS++ model."""
global CURRENT_MODEL, CURRENT_MODEL_SIZE
if model_size not in MODEL_CONFIGS:
raise ValueError(f"Invalid model size: {model_size}. Available: {list(MODEL_CONFIGS.keys())}")
config = MODEL_CONFIGS[model_size]
print(f"Loading {model_size} model...")
model = VisionTSpp(
config["arch"],
ckpt_path=config["ckpt_path"],
quantile=True,
clip_input=True,
complete_no_clip=False,
color=True
).to(DEVICE)
print(f"Model {model_size} loaded on {DEVICE}")
# Unload the previous model to free memory if it was loaded
if CURRENT_MODEL is not None:
print(f"Unloading previous model ({CURRENT_MODEL_SIZE})...")
del CURRENT_MODEL
torch.cuda.empty_cache() # Clear GPU cache if using CUDA
CURRENT_MODEL = model
CURRENT_MODEL_SIZE = model_size
return model
# Load the default model (base) on startup
if CURRENT_MODEL is None:
CURRENT_MODEL = load_model_for_size("base") # Or "large" as default
# ========================
# 2. Preset Datasets (Now Loaded Locally)
# ========================
data_dir = "./datasets/"
PRESET_DATASETS = {
"ETTm1": data_dir + "ETTm1.csv",
"ETTm2": data_dir + "ETTm2.csv",
"ETTh1": data_dir + "ETTh1.csv",
"ETTh2": data_dir + "ETTh2.csv",
"Illness": data_dir + "Illness.csv",
"Weather": data_dir + "Weather.csv",
}
def load_preset_data(name):
"""Loads a preset dataset from a local path."""
path = PRESET_DATASETS[name]
if not os.path.exists(path):
raise FileNotFoundError(f"Preset dataset file not found: {path}. Make sure it's uploaded to the 'datasets' folder.")
return pd.read_csv(path)
# ========================
# 3. Visualization Functions (No changes needed)
# ========================
def show_image_tensor(image_tensor, title='', cur_nvars=1, cur_color_list=None):
if image_tensor is None:
return None
image = image_tensor.cpu()
cur_image = torch.zeros_like(image)
height_per_var = image.shape[0] // cur_nvars
for i in range(cur_nvars):
cur_color_idx = cur_color_list[i]
var_slice = image[i*height_per_var:(i+1)*height_per_var, :, :]
unnormalized_channel = var_slice[:, :, cur_color_idx] * imagenet_std[cur_color_idx] + imagenet_mean[cur_color_idx]
cur_image[i*height_per_var:(i+1)*height_per_var, :, cur_color_idx] = unnormalized_channel * 255
cur_image = torch.clamp(cur_image, 0, 255).int().numpy()
fig, ax = plt.subplots(figsize=(6, 6))
ax.imshow(cur_image)
ax.set_title(title, fontsize=14)
ax.axis('off')
plt.tight_layout()
plt.close(fig)
return fig
def visual_ts_with_quantiles(true_data, pred_median, pred_quantiles_list, model_quantiles, context_len, pred_len):
if isinstance(true_data, torch.Tensor):
true_data = true_data.cpu().numpy()
if isinstance(pred_median, torch.Tensor):
pred_median = pred_median.cpu().numpy()
for i, q in enumerate(pred_quantiles_list):
if isinstance(q, torch.Tensor):
pred_quantiles_list[i] = q.cpu().numpy()
nvars = true_data.shape[1]
FIG_WIDTH, FIG_HEIGHT_PER_VAR = 15, 2.0
fig, axes = plt.subplots(nvars, 1, figsize=(FIG_WIDTH, nvars * FIG_HEIGHT_PER_VAR), sharex=True)
if nvars == 1:
axes = [axes]
pred_quantiles_list.insert(len(QUANTILES)//2, pred_median)
sorted_quantiles = sorted(zip(QUANTILES, pred_quantiles_list), key=lambda x: x[0])
quantile_preds = [item[1] for item in sorted_quantiles if item[0] != 0.5]
quantile_vals = [item[0] for item in sorted_quantiles if item[0] != 0.5]
num_bands = len(quantile_preds) // 2
quantile_colors = plt.cm.Blues(np.linspace(0.3, 0.8, num_bands))[::-1]
for i, ax in enumerate(axes):
ax.plot(true_data[:, i], label='Ground Truth', color='black', linewidth=1.5)
# pred_range = np.arange(context_len, context_len + pred_len)
pred_range = np.arange(context_len-1, context_len + pred_len)
pred_median_visual = np.concatenate([true_data[context_len-1:context_len, i], pred_median[:, i]])
# print(pred_median_visual.shape)
ax.plot(pred_range, pred_median_visual, label='Prediction (Median)', color='red', linewidth=1.5)
# ax.plot(pred_range, pred_median[:, i], label='Prediction (Median)', color='red', linewidth=1.5)
for j in range(num_bands):
lower_quantile_pred, upper_quantile_pred = quantile_preds[j][:, i], quantile_preds[-(j+1)][:, i]
lower_quantile_pred_visual = np.concatenate([true_data[context_len-1:context_len, i], lower_quantile_pred])
upper_quantile_pred_visual = np.concatenate([true_data[context_len-1:context_len, i], upper_quantile_pred])
q_low, q_high = quantile_vals[j], quantile_vals[-(j+1)]
ax.fill_between(pred_range, lower_quantile_pred_visual, upper_quantile_pred_visual, color=quantile_colors[j], alpha=0.7, label=f'{int(q_low*100)}-{int(q_high*100)}% Quantile')
# ax.fill_between(pred_range, lower_quantile_pred, upper_quantile_pred, color=quantile_colors[j], alpha=0.7, label=f'{int(q_low*100)}-{int(q_high*100)}% Quantile')
y_min, y_max = ax.get_ylim()
# ax.vlines(x=context_len, ymin=y_min, ymax=y_max, colors='gray', linestyles='--', alpha=0.7)
ax.vlines(x=context_len-1, ymin=y_min, ymax=y_max, colors='gray', linestyles='--', alpha=0.7)
ax.set_ylabel(f'Var {i+1}', rotation=0, labelpad=30, ha='right', va='center')
ax.grid(True, which='both', linestyle='--', linewidth=0.5)
ax.margins(x=0)
handles, labels = axes[0].get_legend_handles_labels()
unique_labels = dict(zip(labels, handles))
fig.legend(unique_labels.values(), unique_labels.keys(), loc='upper center', bbox_to_anchor=(0.5, 1.05), ncol=num_bands + 2)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.close(fig)
return fig
# ========================
# 4. Prediction Logic
# ========================
class PredictionResult:
def __init__(self, ts_fig, input_img_fig, recon_img_fig, csv_path, total_samples, inferred_freq):
self.ts_fig = ts_fig
self.input_img_fig = input_img_fig
self.recon_img_fig = recon_img_fig
self.csv_path = csv_path
self.total_samples = total_samples
self.inferred_freq = inferred_freq
def predict_at_index(df, index, context_len, pred_len, session_dir, model_size):
if 'date' not in df.columns:
raise gr.Error("❌ Input CSV must contain a 'date' column.")
try:
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values('date').set_index('date')
inferred_freq = pd.infer_freq(df.index)
if inferred_freq is None:
time_diff = df.index[1] - df.index[0]
inferred_freq = pd.tseries.frequencies.to_offset(time_diff).freqstr
gr.Warning(f"Could not reliably infer frequency. Using fallback based on first two timestamps: {inferred_freq}")
print(f"Inferred frequency: {inferred_freq}")
except Exception as e:
raise gr.Error(f"❌ Date processing failed: {e}. Please check the date format (e.g., YYYY-MM-DD HH:MM:SS).")
data = df.select_dtypes(include=np.number).values
nvars = data.shape[1]
total_samples = len(data) - context_len - pred_len + 1
if total_samples <= 0:
raise gr.Error(f"Data is too short. It needs at least context_len + pred_len = {context_len + pred_len} rows, but has {len(data)}.")
index = max(0, min(index, total_samples - 1))
train_len = int(len(data) * 0.7)
x_mean = data[:train_len].mean(axis=0, keepdims=True)
x_std = data[:train_len].std(axis=0, keepdims=True) + 1e-8
data_norm = (data - x_mean) / x_std
start_idx = index
x_norm = data_norm[start_idx : start_idx + context_len]
y_true_norm = data_norm[start_idx + context_len : start_idx + context_len + pred_len]
x_tensor = torch.FloatTensor(x_norm).unsqueeze(0).to(DEVICE)
periodicity_list = freq_to_seasonality_list(inferred_freq)
periodicity = periodicity_list[0] if periodicity_list else 1
color_list = [i % 3 for i in range(nvars)]
# --- Load the requested model if it's not the current one ---
if CURRENT_MODEL_SIZE != model_size:
print(f"Switching model from {CURRENT_MODEL_SIZE} to {model_size}")
load_model_for_size(model_size)
# --- Use the currently loaded model ---
model = CURRENT_MODEL
model.update_config(context_len=context_len, pred_len=pred_len, periodicity=periodicity,
num_patch_input=7, padding_mode='constant')
with torch.no_grad():
y_pred, input_image, reconstructed_image, _, _ = model.forward(
x_tensor, export_image=True, color_list=color_list
)
y_pred, y_pred_quantile_list = y_pred
all_y_pred_list = copy.deepcopy(y_pred_quantile_list)
all_y_pred_list.insert(len(QUANTILES)//2, y_pred)
all_preds = dict(zip(QUANTILES, all_y_pred_list))
pred_median_norm = all_preds.pop(0.5)[0]
pred_quantiles_norm = [q[0] for q in list(all_preds.values())]
y_true = y_true_norm * x_std + x_mean
pred_median = pred_median_norm.cpu().numpy() * x_std + x_mean
pred_quantiles = [q.cpu().numpy() * x_std + x_mean for q in pred_quantiles_norm]
full_true_context = data[start_idx : start_idx + context_len]
full_true_series = np.concatenate([full_true_context, y_true], axis=0)
ts_fig = visual_ts_with_quantiles(
true_data=full_true_series, pred_median=pred_median,
pred_quantiles_list=pred_quantiles, model_quantiles=list(all_preds.keys()),
context_len=context_len, pred_len=pred_len
)
input_img_fig = show_image_tensor(input_image[0, 0], f'Input Image (Sample {index})', nvars, color_list)
recon_img_fig = show_image_tensor(reconstructed_image[0, 0], 'Reconstructed Image', nvars, color_list)
csv_path = Path(session_dir) / "prediction_result.csv"
time_index = df.index[start_idx + context_len : start_idx + context_len + pred_len]
result_data = {'date': time_index}
for i in range(nvars):
result_data[f'True_Var{i+1}'] = y_true[:, i]
result_data[f'Pred_Median_Var{i+1}'] = pred_median[:, i]
result_df = pd.DataFrame(result_data)
result_df.to_csv(csv_path, index=False)
return PredictionResult(ts_fig, input_img_fig, recon_img_fig, str(csv_path), total_samples, inferred_freq)
# ========================
# 5. Gradio Interface
# ========================
def get_session_dir(session_id: gr.State):
"""Creates and returns a unique directory for the user session."""
if session_id is None or not Path(session_id).exists():
session_uuid = str(uuid.uuid4())
session_dir = Path(SESSION_DIR_ROOT) / session_uuid
session_dir.mkdir(exist_ok=True, parents=True)
session_id = str(session_dir)
return session_id
def run_forecast(data_source, upload_file, index, context_len, pred_len, model_size, session_id: gr.State):
session_dir = get_session_dir(session_id)
try:
if data_source == "Upload CSV":
if upload_file is None:
raise gr.Error("Please upload a CSV file when 'Upload CSV' is selected.")
uploaded_file_path = Path(session_dir) / Path(upload_file.name).name
shutil.copy(upload_file.name, uploaded_file_path)
df = pd.read_csv(uploaded_file_path)
else:
df = load_preset_data(data_source)
index, context_len, pred_len = int(index), int(context_len), int(pred_len)
# --- Pass model_size to predict_at_index ---
result = predict_at_index(df, index, context_len, pred_len, session_dir, model_size)
final_index = min(index, result.total_samples - 1)
return (
result.ts_fig,
result.input_img_fig,
result.recon_img_fig,
result.csv_path,
gr.update(maximum=result.total_samples - 1, value=final_index),
gr.update(value=result.inferred_freq),
session_dir
)
except Exception as e:
print(f"Error during forecast: {e}")
error_fig = plt.figure(figsize=(10, 5))
plt.text(0.5, 0.5, f"An error occurred:\n{str(e)}", ha='center', va='center', wrap=True, color='red', fontsize=12)
plt.axis('off')
plt.close(error_fig)
return error_fig, None, None, None, gr.update(), gr.update(value="Error"), session_id
with gr.Blocks(title="VisionTS++ Advanced Forecasting Platform", theme=gr.themes.Soft()) as demo:
session_id_state = gr.State(None)
gr.Markdown("# πŸ•°οΈ VisionTS++: Multivariate Time Series Forecasting")
gr.Markdown(
"""
An interactive platform to explore time series forecasting using the VisionTS++ model.
- βœ… **Select** from local preset datasets or **upload** your own.
- βœ… **Frequency is auto-detected** from the 'date' column.
- βœ… **Visualize** predictions with multiple **quantile uncertainty bands**.
- βœ… **Slide** through different samples of the dataset for real-time forecasting.
- βœ… **Download** the prediction results as a CSV file.
- βœ… **User Isolation**: Each user session has its own temporary storage to prevent file conflicts. Old files are automatically cleaned up.
- βœ… **Model Selection**: Choose between 'base' and 'large' VisionTS++ models.
"""
)
with gr.Row():
with gr.Column(scale=1, min_width=300):
gr.Markdown("### 1. Data & Model Configuration")
# --- Add model selection dropdown ---
model_size = gr.Dropdown(
label="Select Model Size",
choices=["base", "large"],
value="base" # Default to base
)
data_source = gr.Dropdown(
label="Select Data Source",
choices=list(PRESET_DATASETS.keys()) + ["Upload CSV"],
value="ETTh1"
)
upload_file = gr.File(label="Upload CSV File", file_types=['.csv'], visible=False)
gr.Markdown(
"""
**Upload Rules:**
1. Must be a `.csv` file.
2. Must contain a time column named `date` with a consistent frequency.
"""
)
context_len = gr.Number(label="Context Length (History)", value=336)
pred_len = gr.Number(label="Prediction Length (Future)", value=96)
freq_display = gr.Textbox(label="Detected Frequency", interactive=False)
run_btn = gr.Button("πŸš€ Run Forecast", variant="primary")
gr.Markdown("### 2. Sample Selection")
sample_index = gr.Slider(
label="Sample Index",
minimum=0,
maximum=1000000,
step=1,
value=1000000,
info="Drag the slider to select different starting points from the dataset for prediction."
)
with gr.Column(scale=3):
gr.Markdown("### 3. Prediction Results")
ts_plot = gr.Plot(label="Time Series Forecast with Quantile Bands")
gr.Markdown(
"""
**Plot Explanation:**
- **⚫ Black Line:** Ground truth data. The left side is the input context, and the right side is the actual future value.
- **πŸ”΄ Red Line:** The model's median prediction for the future.
- **πŸ”΅ Blue Shaded Areas:** Represent the model's uncertainty. The darker the blue, the wider the prediction interval, indicating more uncertainty.
"""
)
with gr.Row():
input_img_plot = gr.Plot(label="Input as Image")
recon_img_plot = gr.Plot(label="Reconstructed Image")
gr.Markdown(
"""
**Image Explanation:**
- **Input as Image:** The historical time series data (look-back window) transformed into an image format that the VisionTS++ model uses as input.
- **Reconstructed Image:** The model's internal reconstruction of the input image. This helps to visualize what features the model is focusing on.
"""
)
download_csv = gr.File(label="Download Prediction CSV")
gr.Markdown(
"""
**Download Prediction CSV:**
- You can download the prediction results of VisionTS++ here!
"""
)
def toggle_upload_visibility(choice):
return gr.update(visible=(choice == "Upload CSV"))
data_source.change(fn=toggle_upload_visibility, inputs=data_source, outputs=upload_file)
# --- Include model_size in the inputs list ---
inputs = [data_source, upload_file, sample_index, context_len, pred_len, model_size, session_id_state]
outputs = [ts_plot, input_img_plot, recon_img_plot, download_csv, sample_index, freq_display, session_id_state]
run_btn.click(fn=run_forecast, inputs=inputs, outputs=outputs, api_name="run_forecast")
sample_index.release(fn=run_forecast, inputs=inputs, outputs=outputs, api_name="run_forecast_on_slide")
# ========================
# 6. Main Execution Block
# ========================
if __name__ == "__main__":
# --- Run initial cleanup on startup ---
cleanup_old_sessions()
# --- Start the periodic cleanup in a background daemon thread ---
cleanup_thread = threading.Thread(target=periodic_cleanup_task, daemon=True)
cleanup_thread.start()
# --- Launch the Gradio app ---
demo.launch(debug=True)