Spaces:
Sleeping
Sleeping
| import cv2 | |
| import mediapipe as mp | |
| import numpy as np | |
| import time | |
| import gradio as gr | |
| from ultralytics import YOLO | |
| from PIL import Image | |
| # ---------------- CONFIG ---------------- # | |
| CONF_THRESHOLD = 0.6 | |
| COOLDOWN_TIME = 3 # seconds between alerts | |
| MODEL_PATH = "best.pt" # Place your model in the same directory | |
| FRAME_WIDTH = 320 | |
| FRAME_HEIGHT = 240 | |
| # ---------------------------------------- # | |
| # ---------------- MediaPipe Setup ---------------- # | |
| mp_pose = mp.solutions.pose | |
| mp_drawing = mp.solutions.drawing_utils | |
| # ---------------- Load YOLO Model ---------------- # | |
| try: | |
| model = YOLO(MODEL_PATH) | |
| except: | |
| print("Warning: Model not found. Using dummy detection.") | |
| model = None | |
| # ---------------- Global State ---------------- # | |
| class DetectionState: | |
| def __init__(self): | |
| self.last_alert_time = 0 | |
| self.state = 'no_hold' | |
| self.alert_count = 0 | |
| self.pose = mp_pose.Pose( | |
| min_detection_confidence=0.5, | |
| min_tracking_confidence=0.5 | |
| ) | |
| state_obj = DetectionState() | |
| # ---------------- Utility Functions ---------------- # | |
| def distance(a, b): | |
| return np.sqrt((a[0]-b[0])**2 + (a[1]-b[1])**2) | |
| # ---------------- Littering Detection ---------------- # | |
| def detect_littering(frame, pose_results): | |
| feedback = "SAFE" | |
| current_time = time.time() | |
| # 1️⃣ Get Right Hand Position from MediaPipe | |
| hand = None | |
| if pose_results.pose_landmarks: | |
| landmarks = pose_results.pose_landmarks.landmark | |
| wrist = landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value] | |
| hand = (wrist.x, wrist.y) | |
| # 2️⃣ Run YOLO Detection | |
| trash_positions = [] | |
| if model is not None: | |
| results = model.predict(frame, conf=CONF_THRESHOLD, verbose=False) | |
| for result in results: | |
| boxes = result.boxes.xyxy.cpu().numpy() | |
| confs = result.boxes.conf.cpu().numpy() | |
| for (x1, y1, x2, y2), conf in zip(boxes, confs): | |
| cx, cy = (x1+x2)/2/frame.shape[1], (y1+y2)/2/frame.shape[0] | |
| trash_positions.append((cx, cy)) | |
| cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,0), 2) | |
| cv2.putText(frame, f"Trash {conf:.2f}", (int(x1), int(y1)-5), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2) | |
| # 3️⃣ State Machine | |
| if hand and trash_positions: | |
| dists = [distance(hand, t) for t in trash_positions] | |
| min_dist = min(dists) | |
| if state_obj.state == 'no_hold' and min_dist < 0.1: | |
| state_obj.state = 'holding' | |
| feedback = "HOLDING TRASH" | |
| elif state_obj.state == 'holding': | |
| feedback = "HOLDING TRASH" | |
| if min_dist > 0.25: | |
| state_obj.state = 'throwing' | |
| feedback = "THROWING TRASH" | |
| elif state_obj.state == 'throwing': | |
| if min_dist > 0.25 and (current_time - state_obj.last_alert_time > COOLDOWN_TIME): | |
| feedback = "⚠️ LITTERING DETECTED!" | |
| state_obj.alert_count += 1 | |
| state_obj.last_alert_time = current_time | |
| state_obj.state = 'no_hold' | |
| # Draw MediaPipe Pose | |
| if pose_results.pose_landmarks: | |
| mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS) | |
| return frame, feedback | |
| # ---------------- Gradio Processing Function ---------------- # | |
| def process_frame(frame): | |
| """Process a single frame from webcam""" | |
| if frame is None: | |
| return None, "No frame", 0 | |
| # Resize frame | |
| frame = cv2.resize(frame, (FRAME_WIDTH, FRAME_HEIGHT)) | |
| # Process with MediaPipe | |
| pose_results = state_obj.pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) | |
| # Detect littering | |
| output, feedback = detect_littering(frame, pose_results) | |
| # Add UI Overlay | |
| cv2.rectangle(output, (0,0), (250,70), (50,50,50), -1) | |
| cv2.putText(output, f'ALERTS: {state_obj.alert_count}', (10,40), | |
| cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2) | |
| color = (0,0,255) if "⚠️" in feedback else (0,150,0) | |
| cv2.rectangle(output, (250,0), (FRAME_WIDTH,70), color, -1) | |
| cv2.putText(output, feedback, (260,45), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) | |
| return output, feedback, state_obj.alert_count | |
| def reset_alerts(): | |
| """Reset the alert counter""" | |
| state_obj.alert_count = 0 | |
| state_obj.state = 'no_hold' | |
| return 0 | |
| # ---------------- Gradio Interface ---------------- # | |
| with gr.Blocks(title="Smart Garbage Patrol") as demo: | |
| gr.Markdown(""" | |
| # 🗑️ Smart Garbage Patrol - Littering Detection System | |
| This system uses AI to detect littering behavior in real-time: | |
| - **MediaPipe** tracks hand movements | |
| - **YOLOv8** detects trash objects | |
| - **State Machine** identifies throwing behavior | |
| **How it works:** | |
| 1. Hold trash near your hand → System detects "HOLDING TRASH" | |
| 2. Move hand away quickly → System detects "THROWING TRASH" | |
| 3. If trash is released → "⚠️ LITTERING DETECTED!" | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| webcam = gr.Image(sources=["webcam"], streaming=True, type="numpy") | |
| reset_btn = gr.Button("🔄 Reset Alert Count", variant="secondary") | |
| with gr.Column(): | |
| output_frame = gr.Image(label="Detection Output") | |
| status_text = gr.Textbox(label="Current Status", interactive=False) | |
| alert_counter = gr.Number(label="Total Alerts", value=0, interactive=False) | |
| # Process webcam stream | |
| webcam.stream( | |
| fn=process_frame, | |
| inputs=[webcam], | |
| outputs=[output_frame, status_text, alert_counter], | |
| show_progress=False | |
| ) | |
| # Reset button | |
| reset_btn.click( | |
| fn=reset_alerts, | |
| outputs=[alert_counter] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |