File size: 5,916 Bytes
4f6ed2e
 
 
 
 
 
 
 
 
787da7e
4f6ed2e
 
787da7e
 
4f6ed2e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
import cv2
import mediapipe as mp
import numpy as np
import time
import gradio as gr
from ultralytics import YOLO
from PIL import Image

# ---------------- CONFIG ---------------- #
CONF_THRESHOLD = 0.6
COOLDOWN_TIME = 3  # seconds between alerts
MODEL_PATH = "best.pt"  # Place your model in the same directory
FRAME_WIDTH = 320
FRAME_HEIGHT = 240
# ---------------------------------------- #

# ---------------- MediaPipe Setup ---------------- #
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils

# ---------------- Load YOLO Model ---------------- #
try:
    model = YOLO(MODEL_PATH)
except:
    print("Warning: Model not found. Using dummy detection.")
    model = None

# ---------------- Global State ---------------- #
class DetectionState:
    def __init__(self):
        self.last_alert_time = 0
        self.state = 'no_hold'
        self.alert_count = 0
        self.pose = mp_pose.Pose(
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )

state_obj = DetectionState()

# ---------------- Utility Functions ---------------- #
def distance(a, b):
    return np.sqrt((a[0]-b[0])**2 + (a[1]-b[1])**2)

# ---------------- Littering Detection ---------------- #
def detect_littering(frame, pose_results):
    feedback = "SAFE"
    current_time = time.time()

    # 1️⃣ Get Right Hand Position from MediaPipe
    hand = None
    if pose_results.pose_landmarks:
        landmarks = pose_results.pose_landmarks.landmark
        wrist = landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value]
        hand = (wrist.x, wrist.y)

    # 2️⃣ Run YOLO Detection
    trash_positions = []
    if model is not None:
        results = model.predict(frame, conf=CONF_THRESHOLD, verbose=False)

        for result in results:
            boxes = result.boxes.xyxy.cpu().numpy()
            confs = result.boxes.conf.cpu().numpy()
            for (x1, y1, x2, y2), conf in zip(boxes, confs):
                cx, cy = (x1+x2)/2/frame.shape[1], (y1+y2)/2/frame.shape[0]
                trash_positions.append((cx, cy))
                cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0,255,0), 2)
                cv2.putText(frame, f"Trash {conf:.2f}", (int(x1), int(y1)-5),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2)

    # 3️⃣ State Machine
    if hand and trash_positions:
        dists = [distance(hand, t) for t in trash_positions]
        min_dist = min(dists)

        if state_obj.state == 'no_hold' and min_dist < 0.1:
            state_obj.state = 'holding'
            feedback = "HOLDING TRASH"

        elif state_obj.state == 'holding':
            feedback = "HOLDING TRASH"
            if min_dist > 0.25:
                state_obj.state = 'throwing'
                feedback = "THROWING TRASH"

        elif state_obj.state == 'throwing':
            if min_dist > 0.25 and (current_time - state_obj.last_alert_time > COOLDOWN_TIME):
                feedback = "⚠️ LITTERING DETECTED!"
                state_obj.alert_count += 1
                state_obj.last_alert_time = current_time
                state_obj.state = 'no_hold'

    # Draw MediaPipe Pose
    if pose_results.pose_landmarks:
        mp_drawing.draw_landmarks(frame, pose_results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

    return frame, feedback

# ---------------- Gradio Processing Function ---------------- #
def process_frame(frame):
    """Process a single frame from webcam"""
    if frame is None:
        return None, "No frame", 0
    
    # Resize frame
    frame = cv2.resize(frame, (FRAME_WIDTH, FRAME_HEIGHT))
    
    # Process with MediaPipe
    pose_results = state_obj.pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    
    # Detect littering
    output, feedback = detect_littering(frame, pose_results)
    
    # Add UI Overlay
    cv2.rectangle(output, (0,0), (250,70), (50,50,50), -1)
    cv2.putText(output, f'ALERTS: {state_obj.alert_count}', (10,40),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2)
    
    color = (0,0,255) if "⚠️" in feedback else (0,150,0)
    cv2.rectangle(output, (250,0), (FRAME_WIDTH,70), color, -1)
    cv2.putText(output, feedback, (260,45),
                cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
    
    return output, feedback, state_obj.alert_count

def reset_alerts():
    """Reset the alert counter"""
    state_obj.alert_count = 0
    state_obj.state = 'no_hold'
    return 0

# ---------------- Gradio Interface ---------------- #
with gr.Blocks(title="Smart Garbage Patrol") as demo:
    gr.Markdown("""
    # 🗑️ Smart Garbage Patrol - Littering Detection System
    
    This system uses AI to detect littering behavior in real-time:
    - **MediaPipe** tracks hand movements
    - **YOLOv8** detects trash objects
    - **State Machine** identifies throwing behavior
    
    **How it works:**
    1. Hold trash near your hand → System detects "HOLDING TRASH"
    2. Move hand away quickly → System detects "THROWING TRASH"
    3. If trash is released → "⚠️ LITTERING DETECTED!"
    """)
    
    with gr.Row():
        with gr.Column():
            webcam = gr.Image(sources=["webcam"], streaming=True, type="numpy")
            reset_btn = gr.Button("🔄 Reset Alert Count", variant="secondary")
        
        with gr.Column():
            output_frame = gr.Image(label="Detection Output")
            status_text = gr.Textbox(label="Current Status", interactive=False)
            alert_counter = gr.Number(label="Total Alerts", value=0, interactive=False)
    
    # Process webcam stream
    webcam.stream(
        fn=process_frame,
        inputs=[webcam],
        outputs=[output_frame, status_text, alert_counter],
        show_progress=False
    )
    
    # Reset button
    reset_btn.click(
        fn=reset_alerts,
        outputs=[alert_counter]
    )

if __name__ == "__main__":
    demo.launch()