|
|
from mcp.server.fastmcp import FastMCP |
|
|
from datetime import datetime |
|
|
import random |
|
|
import urllib.parse |
|
|
|
|
|
|
|
|
mcp = FastMCP("FlightsAgent") |
|
|
|
|
|
|
|
|
REAL_AIRLINES = [ |
|
|
{"name": "Emirates", "code": "EK", "hub": "DXB"}, |
|
|
{"name": "British Airways", "code": "BA", "hub": "LHR"}, |
|
|
{"name": "Qatar Airways", "code": "QR", "hub": "DOH"}, |
|
|
{"name": "Turkish Airlines", "code": "TK", "hub": "IST"}, |
|
|
{"name": "Lufthansa", "code": "LH", "hub": "FRA"}, |
|
|
{"name": "Air France", "code": "AF", "hub": "CDG"}, |
|
|
{"name": "KLM", "code": "KL", "hub": "AMS"}, |
|
|
{"name": "Etihad Airways", "code": "EY", "hub": "AUH"}, |
|
|
{"name": "Swiss", "code": "LX", "hub": "ZRH"}, |
|
|
{"name": "Singapore Airlines", "code": "SQ", "hub": "SIN"}, |
|
|
] |
|
|
|
|
|
|
|
|
LAYOVER_CITIES = { |
|
|
"DOH": "Doha", "IST": "Istanbul", "FRA": "Frankfurt", |
|
|
"CDG": "Paris", "AMS": "Amsterdam", "DXB": "Dubai", |
|
|
"AUH": "Abu Dhabi", "ZRH": "Zurich", "MUC": "Munich" |
|
|
} |
|
|
|
|
|
def generate_realistic_price(origin: str, destination: str, passengers: int, cabin_class: str = "economy") -> int: |
|
|
"""Generate realistic flight prices based on route.""" |
|
|
|
|
|
base_prices = { |
|
|
"short": (150, 350), |
|
|
"medium": (350, 700), |
|
|
"long": (600, 1500), |
|
|
} |
|
|
|
|
|
|
|
|
long_haul_dests = ["dubai", "tokyo", "sydney", "new york", "los angeles", "singapore", "bangkok", "bali"] |
|
|
medium_dests = ["paris", "rome", "barcelona", "amsterdam", "berlin", "madrid", "lisbon", "athens"] |
|
|
|
|
|
dest_lower = destination.lower() |
|
|
if any(d in dest_lower for d in long_haul_dests): |
|
|
min_p, max_p = base_prices["long"] |
|
|
elif any(d in dest_lower for d in medium_dests): |
|
|
min_p, max_p = base_prices["medium"] |
|
|
else: |
|
|
min_p, max_p = base_prices["short"] |
|
|
|
|
|
base = random.randint(min_p, max_p) |
|
|
|
|
|
|
|
|
if cabin_class == "business": |
|
|
base = int(base * 3.5) |
|
|
elif cabin_class == "first": |
|
|
base = int(base * 6) |
|
|
|
|
|
return base * passengers |
|
|
|
|
|
@mcp.tool() |
|
|
def search_flights(origin: str, destination: str, date: str, passengers: int = 1, return_date: str = "") -> str: |
|
|
"""Search for flights between two cities. Returns a list of available flight options with booking links.""" |
|
|
|
|
|
try: |
|
|
flight_date = datetime.strptime(date, "%Y-%m-%d") |
|
|
formatted_date = flight_date.strftime("%B %d, %Y") |
|
|
except ValueError: |
|
|
return "Error: Date must be in YYYY-MM-DD format." |
|
|
|
|
|
|
|
|
if not return_date: |
|
|
from datetime import timedelta |
|
|
return_dt = flight_date + timedelta(days=7) |
|
|
return_date = return_dt.strftime("%Y-%m-%d") |
|
|
|
|
|
|
|
|
origin_clean = origin.split(",")[0].strip() |
|
|
dest_clean = destination.split(",")[0].strip() |
|
|
|
|
|
|
|
|
origin_encoded = urllib.parse.quote(origin_clean) |
|
|
dest_encoded = urllib.parse.quote(dest_clean) |
|
|
origin_lower = origin_clean.lower().replace(" ", "-") |
|
|
dest_lower = dest_clean.lower().replace(" ", "-") |
|
|
|
|
|
results = [] |
|
|
results.append(f"✈️ **Flight Search: {origin} → {destination}**") |
|
|
results.append(f"📅 {formatted_date} | 👥 {passengers} passenger{'s' if passengers > 1 else ''}") |
|
|
results.append("") |
|
|
results.append("---") |
|
|
|
|
|
|
|
|
used_airlines = random.sample(REAL_AIRLINES, min(4, len(REAL_AIRLINES))) |
|
|
|
|
|
flight_times = [ |
|
|
(6, 30), (9, 15), (14, 45), (19, 20), (22, 10) |
|
|
] |
|
|
random.shuffle(flight_times) |
|
|
|
|
|
for i, airline in enumerate(used_airlines[:4]): |
|
|
dep_hour, dep_min = flight_times[i] |
|
|
|
|
|
|
|
|
is_direct = random.random() > 0.6 |
|
|
|
|
|
if is_direct: |
|
|
duration_h = random.randint(6, 8) |
|
|
duration_m = random.choice([0, 15, 30, 45]) |
|
|
total_mins = duration_h * 60 + duration_m |
|
|
arr_hour = (dep_hour + duration_h + (dep_min + duration_m) // 60) % 24 |
|
|
arr_min = (dep_min + duration_m) % 60 |
|
|
next_day = "⁺¹" if dep_hour + duration_h >= 24 else "" |
|
|
stops_text = "Direct" |
|
|
layover_text = "" |
|
|
else: |
|
|
|
|
|
layover_code = random.choice(list(LAYOVER_CITIES.keys())) |
|
|
layover_city = LAYOVER_CITIES[layover_code] |
|
|
layover_duration = random.randint(1, 3) |
|
|
layover_mins = random.choice([0, 15, 30, 45]) |
|
|
|
|
|
leg1_duration = random.randint(3, 5) |
|
|
leg2_duration = random.randint(3, 5) |
|
|
total_duration_h = leg1_duration + layover_duration + leg2_duration |
|
|
total_mins = total_duration_h * 60 + layover_mins |
|
|
|
|
|
arr_hour = (dep_hour + total_duration_h) % 24 |
|
|
arr_min = (dep_min + layover_mins) % 60 |
|
|
next_day = "⁺¹" if dep_hour + total_duration_h >= 24 else "" |
|
|
stops_text = "1 stop" |
|
|
layover_text = f" via {layover_city} ({layover_duration}h {layover_mins}m layover)" |
|
|
|
|
|
|
|
|
price = generate_realistic_price(origin, destination, passengers) |
|
|
flight_num = f"{airline['code']}{random.randint(100, 999)}" |
|
|
|
|
|
|
|
|
co2 = random.randint(400, 900) |
|
|
|
|
|
|
|
|
|
|
|
google_url = f"https://www.google.com/travel/flights?q={origin_encoded}%20to%20{dest_encoded}%20{date}%20to%20{return_date}" |
|
|
skyscanner_url = f"https://www.skyscanner.com/transport/flights/{origin_lower}/{dest_lower}/{date}/{return_date}/?adults={passengers}&cabinclass=economy" |
|
|
|
|
|
results.append("") |
|
|
results.append(f"### ✈️ Option {i+1}: {airline['name']}") |
|
|
results.append(f"**{flight_num}** • {stops_text}{layover_text}") |
|
|
results.append(f"🕐 {dep_hour:02d}:{dep_min:02d} → {arr_hour:02d}:{arr_min:02d}{next_day} ({total_mins // 60}h {total_mins % 60}m)") |
|
|
results.append(f"💰 **${price:,}** total for {passengers} passenger{'s' if passengers > 1 else ''}") |
|
|
results.append(f"🌱 {co2} kg CO₂") |
|
|
results.append(f"🔗 [Book on Google Flights]({google_url}) | [Book on Skyscanner]({skyscanner_url})") |
|
|
results.append("") |
|
|
|
|
|
results.append("---") |
|
|
results.append("") |
|
|
results.append("💡 **Tip:** Prices shown are estimates. Click booking links for live prices and availability.") |
|
|
|
|
|
return "\n".join(results) |
|
|
|
|
|
@mcp.tool() |
|
|
def get_flight_details(flight_number: str) -> str: |
|
|
"""Get detailed information about a specific flight.""" |
|
|
airline_code = flight_number[:2].upper() |
|
|
airline_name = next((a["name"] for a in REAL_AIRLINES if a["code"] == airline_code), "Unknown Airline") |
|
|
|
|
|
return f"""✈️ **Flight {flight_number} Details** |
|
|
|
|
|
🛩️ **Airline:** {airline_name} |
|
|
✈️ **Aircraft:** Boeing 787-9 Dreamliner |
|
|
🚪 **Terminal:** {random.randint(1, 5)} | Gate: {random.choice(['A', 'B', 'C', 'D'])}{random.randint(1, 30)} |
|
|
✅ **Status:** On Time |
|
|
|
|
|
**Included Amenities:** |
|
|
• 🧳 Checked baggage: 23kg |
|
|
• 🎒 Carry-on: 7kg |
|
|
• 🍽️ Meal service included |
|
|
• 📺 In-flight entertainment |
|
|
• 🔌 USB charging ports |
|
|
|
|
|
**Check-in:** Opens 24 hours before departure""" |
|
|
|
|
|
if __name__ == "__main__": |
|
|
mcp.run() |
|
|
|