import cv2 import mediapipe as mp import numpy as np import time import gradio as gr from ultralytics import YOLO from PIL import Image # ---------------- CONFIG ---------------- # CONF_THRESHOLD = 0.6 COOLDOWN_TIME = 3 # seconds between alerts MODEL_PATH = "best.pt" # Place your model in the same directory FRAME_WIDTH = 320 FRAME_HEIGHT = 240 # ---------------------------------------- # # ---------------- MediaPipe Setup ---------------- # mp_pose = mp.solutions.pose mp_drawing = mp.solutions.drawing_utils # ---------------- Load YOLO Model ---------------- # try: model = YOLO(MODEL_PATH) except: print("Warning: Model not found. Using dummy detection.") model = None # ---------------- Global State ---------------- # class DetectionState: def __init__(self): self.last_alert_time = 0 self.state = 'no_hold' self.alert_count = 0 self.pose = mp_pose.Pose( min_detection_confidence=0.5, min_tracking_confidence=0.5 ) state_obj = DetectionState() # ---------------- Utility Functions ---------------- # def distance(a, b): return np.sqrt((a[0]-b[0])**2 + (a[1]-b[1])**2) # ---------------- Littering Detection ---------------- # def detect_littering(frame, pose_results): feedback = "SAFE" current_time = time.time() # 1️⃣ Get Right Hand Position from MediaPipe hand = None if pose_results.pose_landmarks: landmarks = pose_results.pose_landmarks.landmark wrist = landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value] hand = (wrist.x, wrist.y) # 2️⃣ Run YOLO Detection trash_positions = [] if model is not None: results = model.predict(frame, conf=CONF_THRESHOLD, verbose=False) for result in results: boxes = result.boxes.xyxy.cpu().numpy() confs = result.boxes.conf.cpu().numpy() for (x1, y1, x2, y2), conf in zip(boxes, confs): cx, cy = (x1+x2)/2/frame.shape[1], (y1+y2)/2/frame.shape[0] trash_positions.append((cx, cy)) cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,0), 2) cv2.putText(frame, f"Trash {conf:.2f}", (int(x1), int(y1)-5), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2) # 3️⃣ State Machine if hand and trash_positions: dists = [distance(hand, t) for t in trash_positions] min_dist = min(dists) if state_obj.state == 'no_hold' and min_dist < 0.1: state_obj.state = 'holding' feedback = "HOLDING TRASH" elif state_obj.state == 'holding': feedback = "HOLDING TRASH" if min_dist > 0.25: state_obj.state = 'throwing' feedback = "THROWING TRASH" elif state_obj.state == 'throwing': if min_dist > 0.25 and (current_time - state_obj.last_alert_time > COOLDOWN_TIME): feedback = "⚠️ LITTERING DETECTED!" state_obj.alert_count += 1 state_obj.last_alert_time = current_time state_obj.state = 'no_hold' # Draw MediaPipe Pose if pose_results.pose_landmarks: mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS) return frame, feedback # ---------------- Gradio Processing Function ---------------- # def process_frame(frame): """Process a single frame from webcam""" if frame is None: return None, "No frame", 0 # Resize frame frame = cv2.resize(frame, (FRAME_WIDTH, FRAME_HEIGHT)) # Process with MediaPipe pose_results = state_obj.pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # Detect littering output, feedback = detect_littering(frame, pose_results) # Add UI Overlay cv2.rectangle(output, (0,0), (250,70), (50,50,50), -1) cv2.putText(output, f'ALERTS: {state_obj.alert_count}', (10,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2) color = (0,0,255) if "⚠️" in feedback else (0,150,0) cv2.rectangle(output, (250,0), (FRAME_WIDTH,70), color, -1) cv2.putText(output, feedback, (260,45), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2) return output, feedback, state_obj.alert_count def reset_alerts(): """Reset the alert counter""" state_obj.alert_count = 0 state_obj.state = 'no_hold' return 0 # ---------------- Gradio Interface ---------------- # with gr.Blocks(title="Smart Garbage Patrol") as demo: gr.Markdown(""" # 🗑️ Smart Garbage Patrol - Littering Detection System This system uses AI to detect littering behavior in real-time: - **MediaPipe** tracks hand movements - **YOLOv8** detects trash objects - **State Machine** identifies throwing behavior **How it works:** 1. Hold trash near your hand → System detects "HOLDING TRASH" 2. Move hand away quickly → System detects "THROWING TRASH" 3. If trash is released → "⚠️ LITTERING DETECTED!" """) with gr.Row(): with gr.Column(): webcam = gr.Image(sources=["webcam"], streaming=True, type="numpy") reset_btn = gr.Button("🔄 Reset Alert Count", variant="secondary") with gr.Column(): output_frame = gr.Image(label="Detection Output") status_text = gr.Textbox(label="Current Status", interactive=False) alert_counter = gr.Number(label="Total Alerts", value=0, interactive=False) # Process webcam stream webcam.stream( fn=process_frame, inputs=[webcam], outputs=[output_frame, status_text, alert_counter], show_progress=False ) # Reset button reset_btn.click( fn=reset_alerts, outputs=[alert_counter] ) if __name__ == "__main__": demo.launch()