Skip to content
2 changes: 1 addition & 1 deletion backend/.env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Supabase Configuration
SUPABASE_URL=https://your-project.supabase.co
SUPABASE_SERVICE_KEY=your-service-key
SUPABASE_ANON_KEY=your-anon-key

# Startup Mode
# Set ALLOW_DEGRADED_STARTUP=1 to allow backend startup even if duplicate/RAG models fail to load
Expand Down
155 changes: 98 additions & 57 deletions backend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
env_path = Path(__file__).parent / '.env'
load_dotenv(dotenv_path=env_path)

# Initialize Supabase Client (Service Role for backend bypass)
# Initialize Supabase Client (Anon Key to respect RLS)
try:
from supabase import create_client, Client
url = os.environ.get("SUPABASE_URL")
key = os.environ.get("SUPABASE_SERVICE_KEY")
key = os.environ.get("SUPABASE_ANON_KEY")
if not url or not key:
print("[ERROR] SUPABASE_URL or SUPABASE_SERVICE_KEY not set in backend/.env")
print("[ERROR] SUPABASE_URL or SUPABASE_ANON_KEY not set in backend/.env")
supabase = None
else:
supabase = create_client(url, key)
Expand Down Expand Up @@ -176,8 +176,7 @@ class TicketRecord(BaseModel):
timeline: dict = {} # Milestones: created, analyzed, triaged, routed, in_progress, resolved


# --- In-Memory Database (to be replaced with SQL later) ---
TICKETS_DB: list[TicketRecord] = []
# --- In-Memory Database Removed ---


class HealthResponse(BaseModel):
Expand Down Expand Up @@ -283,8 +282,8 @@ async def lifespan(app: FastAPI):
"http://localhost:3000",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
allow_methods=["GET", "POST", "PATCH", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "Accept"],
)


Expand Down Expand Up @@ -473,6 +472,7 @@ async def analyze_bug(request: BugReportAnalysisRequest):
# Admin Correction Logging endpoint
# ---------------------------------------------------------------------------
CORRECTIONS_LOG_PATH = Path(__file__).parent / "data" / "corrections_log.json"
corrections_log_lock = asyncio.Lock()

@app.post("/ai/log_correction")
async def log_correction(raw_request: Request):
Expand Down Expand Up @@ -513,16 +513,17 @@ async def log_correction(raw_request: Request):
}

try:
if CORRECTIONS_LOG_PATH.exists() and CORRECTIONS_LOG_PATH.stat().st_size > 2:
with open(CORRECTIONS_LOG_PATH, "r", encoding="utf-8") as f:
logs = json.load(f)
else:
logs = []
async with corrections_log_lock:
if CORRECTIONS_LOG_PATH.exists() and CORRECTIONS_LOG_PATH.stat().st_size > 2:
with open(CORRECTIONS_LOG_PATH, "r", encoding="utf-8") as f:
logs = json.load(f)
else:
logs = []

logs.append(entry)
logs.append(entry)

with open(CORRECTIONS_LOG_PATH, "w", encoding="utf-8") as f:
json.dump(logs, f, indent=2)
with open(CORRECTIONS_LOG_PATH, "w", encoding="utf-8") as f:
json.dump(logs, f, indent=2)

print(f"[CORRECTION SAVED] Ticket ID: {ticket_id} | Changed: {changed_fields}")
return {"status": "saved", "changed_fields": changed_fields}
Expand All @@ -536,20 +537,34 @@ async def log_correction(raw_request: Request):
# Ticket operations (Now via Supabase)
# ---------------------------------------------------------------------------
@app.get("/tickets")
async def get_tickets(company_id: str | None = None):
@limiter.limit("60/minute")
async def get_tickets(
request: Request,
company_id: str | None = None,
limit: int = 50,
offset: int = 0
):
"""Fetch persistent tickets from Supabase."""
if not supabase:
raise HTTPException(status_code=500, detail="Database connection not initialized")

# Prevent excessive limit values
if limit > 100:
limit = 100

query = supabase.table("tickets").select("*").order("created_at", desc=True)
if company_id:
query = query.eq("company_id", company_id)

# Apply pagination
query = query.range(offset, offset + limit - 1)

res = query.execute()
return res.data

@app.post("/tickets/save")
async def save_ticket(request_body: TicketSaveRequest):
@limiter.limit("30/minute")
async def save_ticket(request_body: TicketSaveRequest, request: Request):
"""
OFFICIAL PERSISTENCE: Saves the analyzed ticket to Supabase.
This is called AFTER the user confirms the analysis results.
Expand All @@ -558,6 +573,30 @@ async def save_ticket(request_body: TicketSaveRequest):
raise HTTPException(status_code=500, detail="Supabase connection not initialized.")

logger = logging.getLogger(__name__)

# --- 1000% Perfect Tenant Auth & Spoofing Protection ---
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(status_code=401, detail="Missing or invalid Authorization header")

token = auth_header.split(" ")[1]
try:
user_response = supabase.auth.get_user(token)
if not user_response or not getattr(user_response, "user", None):
raise HTTPException(status_code=401, detail="Invalid token")
trusted_user_id = user_response.user.id
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise HTTPException(status_code=401, detail="Authentication failed")

# Enforce trusted user ID from token
if request_body.user_id and request_body.user_id != trusted_user_id:
logger.warning(f"ID Spoofing Attempt: Token user {trusted_user_id} tried to act as {request_body.user_id}")
raise HTTPException(status_code=403, detail="User ID mismatch. Spoofing detected.")

request_body.user_id = trusted_user_id
# --------------------------------------------------------

try:
final_data = request_body.dict()

Expand Down Expand Up @@ -638,7 +677,7 @@ async def save_ticket(request_body: TicketSaveRequest):
"ticket_id": ticket_id,
"sender_id": "00000000-0000-0000-0000-000000000000", # System ID
"sender_name": "AI Assistant",
"sender_role": "admin",
"sender_role": "system",
"message": msg
}).execute()

Expand All @@ -652,7 +691,8 @@ async def save_ticket(request_body: TicketSaveRequest):
raise HTTPException(status_code=500, detail=str(e))

@app.get("/tickets/{ticket_id}")
async def get_ticket_by_id(ticket_id: str):
@limiter.limit("60/minute")
async def get_ticket_by_id(ticket_id: str, request: Request):
"""Fetch single persistent ticket."""
if not supabase:
raise HTTPException(status_code=500, detail="Database connection not initialized")
Expand All @@ -663,32 +703,33 @@ async def get_ticket_by_id(ticket_id: str):
return res.data


@app.post("/tickets", response_model=TicketRecord)
async def create_ticket(ticket: TicketRecord):
"""Save a new ticket into the system."""
# Check for duplicates before adding
existing = next((t for t in TICKETS_DB if t.ticket_id == ticket.ticket_id), None)
if existing:
return existing

TICKETS_DB.append(ticket)
print(f"[DB] Ticket #{ticket.ticket_id} created for user {ticket.owner_id}")
return ticket


@app.patch("/tickets/{ticket_id}", response_model=TicketRecord)
async def update_ticket(ticket_id: str, updates: dict):
"""Partially update a ticket's fields (e.g., status, viewed_at)."""
for i, ticket in enumerate(TICKETS_DB):
if str(ticket.ticket_id) == str(ticket_id):
# Convert to dict, update, then back to model
ticket_dict = ticket.dict()
ticket_dict.update(updates)
updated_ticket = TicketRecord(**ticket_dict)
TICKETS_DB[i] = updated_ticket
return updated_ticket
@app.post("/tickets")
@limiter.limit("30/minute")
async def create_ticket(ticket: dict, request: Request):
"""Save a new ticket into the system (persisted to Supabase)."""
if not supabase:
raise HTTPException(status_code=500, detail="Database connection not initialized")

res = supabase.table("tickets").insert(ticket).execute()
if not res.data:
raise HTTPException(status_code=400, detail="Failed to create ticket")

print(f"[DB] Ticket created: {res.data[0].get('id')}")
return res.data[0]


@app.patch("/tickets/{ticket_id}")
@limiter.limit("60/minute")
async def update_ticket(ticket_id: str, updates: dict, request: Request):
"""Partially update a ticket's fields (e.g., status, viewed_at) via Supabase."""
if not supabase:
raise HTTPException(status_code=500, detail="Database connection not initialized")

raise HTTPException(status_code=404, detail="Ticket not found")
res = supabase.table("tickets").update(updates).eq("id", ticket_id).execute()
if not res.data:
raise HTTPException(status_code=404, detail="Ticket not found")

return res.data[0]


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -766,7 +807,7 @@ def get_now_ist():
if request_body.image_base64 and not gemini_analysis["ocr_text"]:
try:
print("[AI] Detecting visual context via Gemini...")
vision_result = gemini_service.analyze_image(request_body.image_base64, text)
vision_result = await asyncio.to_thread(gemini_service.analyze_image, request_body.image_base64, text)
gemini_analysis.update(vision_result)
except Exception as e:
print(f"[VISION ERROR] {e}")
Expand All @@ -775,10 +816,10 @@ def get_now_ist():

# --- Classification ---
try:
classification_v3_res = classifier_v3.predict(text)
classification_v3_res = await asyncio.to_thread(classifier_v3.predict, text)
if "error" in classification_v3_res:
# Fallback to V1
classification = classifier_service.predict(text)
classification = await asyncio.to_thread(classifier_service.predict, text)
else:
# Parse V3 output
cat = classification_v3_res.get("Category", {}).get("prediction", "Unknown")
Expand Down Expand Up @@ -810,22 +851,22 @@ def get_now_ist():

# --- NER ---
try:
entities = ner_service.extract_entities(text)
entities = await asyncio.to_thread(ner_service.extract_entities, text)
except Exception:
entities = []

timeline["metadata_harvested"] = get_now_ist()

# --- Duplicate detection ---
try:
dup_result = duplicate_service.check_duplicate(text, threshold=duplicate_sensitivity)
dup_result = await asyncio.to_thread(duplicate_service.check_duplicate, text, threshold=duplicate_sensitivity)
except Exception:
dup_result = {"is_duplicate": False, "duplicate_ticket_id": None, "similarity": 0.0}

# --- RAG Knowledge Base Check ---
rag_match = None
try:
rag_match = rag_service.search_knowledge_base(text, threshold=0.85)
rag_match = await asyncio.to_thread(rag_service.search_knowledge_base, text, threshold=0.85)
if rag_match:
classification["auto_resolve"] = True
classification["assigned_team"] = "Auto-Resolve AI"
Expand Down Expand Up @@ -920,7 +961,7 @@ async def event_generator():
gemini_analysis = {"ocr_text": request_body.image_text or "", "image_description": ""}
if request_body.image_base64 and not gemini_analysis["ocr_text"]:
try:
vision_result = gemini_service.analyze_image(request_body.image_base64, text)
vision_result = await asyncio.to_thread(gemini_service.analyze_image, request_body.image_base64, text)
gemini_analysis.update(vision_result)
except Exception as e:
pass
Expand All @@ -931,7 +972,7 @@ async def event_generator():
yield f"data: {json.dumps({'step': 'Extracting technical entities', 'status': 'in_progress'})}\n\n"
await asyncio.sleep(0.2)
try:
entities = ner_service.extract_entities(text)
entities = await asyncio.to_thread(ner_service.extract_entities, text)
except Exception:
entities = []
timeline["metadata_harvested"] = get_now_ist()
Expand All @@ -940,9 +981,9 @@ async def event_generator():
yield f"data: {json.dumps({'step': 'Detecting category and priority', 'status': 'in_progress'})}\n\n"
await asyncio.sleep(0.2)
try:
classification_v3_res = classifier_v3.predict(text)
classification_v3_res = await asyncio.to_thread(classifier_v3.predict, text)
if "error" in classification_v3_res:
classification = classifier_service.predict(text)
classification = await asyncio.to_thread(classifier_service.predict, text)
else:
cat = classification_v3_res.get("Category", {}).get("prediction", "Unknown")
sub = classification_v3_res.get("Subcategory", {}).get("prediction", "Unknown")
Expand Down Expand Up @@ -973,7 +1014,7 @@ async def event_generator():
yield f"data: {json.dumps({'step': 'Checking duplicate issues', 'status': 'in_progress'})}\n\n"
await asyncio.sleep(0.2)
try:
dup_result = duplicate_service.check_duplicate(text, threshold=duplicate_sensitivity)
dup_result = await asyncio.to_thread(duplicate_service.check_duplicate, text, threshold=duplicate_sensitivity)
except Exception:
dup_result = {"is_duplicate": False, "duplicate_ticket_id": None, "similarity": 0.0}

Expand All @@ -982,7 +1023,7 @@ async def event_generator():
await asyncio.sleep(0.2)
rag_match = None
try:
rag_match = rag_service.search_knowledge_base(text, threshold=0.85)
rag_match = await asyncio.to_thread(rag_service.search_knowledge_base, text, threshold=0.85)
if rag_match:
classification["auto_resolve"] = True
classification["assigned_team"] = "Auto-Resolve AI"
Expand Down Expand Up @@ -1055,7 +1096,7 @@ async def legacy_analyze_and_save(request_body: TicketRequest):
async def analyze_ticket_v2(request: TicketRequest):
text = request.text
try:
prediction = classifier_v2.predict(text)
prediction = await asyncio.to_thread(classifier_v2.predict, text)
return {
"status": "success",
"category": prediction["category"]["prediction"],
Expand Down
Loading