Spaces:
Sleeping
Sleeping
| import json | |
| import re | |
| import tempfile | |
| from typing import List, Dict, Any, Tuple | |
| import gradio as gr | |
| from transformers import pipeline | |
| # --------------------------- | |
| # Minimal sample (id, question, answer only) | |
| # --------------------------- | |
| SAMPLE_JSON_MIN = """[ | |
| { | |
| "id": "ex-001", | |
| "question": "question", | |
| "answer": "answer" | |
| }, | |
| { | |
| "id": "ex-002", | |
| "question": "question", | |
| "answer": "answer" | |
| }, | |
| { | |
| "id": "ex-003", | |
| "question": "question", | |
| "answer": "answer" | |
| } | |
| ]""" | |
| def download_minimal_sample_json(): | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") as tmp: | |
| tmp.write(SAMPLE_JSON_MIN) | |
| tmp.flush() | |
| return tmp.name | |
| # --------------------------- | |
| # Helpers (formatting & menu-path) | |
| # --------------------------- | |
| def _normalize_ws(s: str) -> str: | |
| return re.sub(r"\s+", " ", (s or "").strip()) | |
| def _sentence_case(s: str) -> str: | |
| s = _normalize_ws(s) | |
| if not s: | |
| return s | |
| # single lowercase 'i' -> 'I' | |
| s = re.sub(r"\bi\b", "I", s) | |
| if s[0].islower(): | |
| s = s[0].upper() + s[1:] | |
| if s[-1] not in ".!?": | |
| s += "." | |
| return s | |
| def _join_path(section: str | None, option: str | None) -> str | None: | |
| section = (section or "").strip() | |
| option = (option or "").strip() | |
| if section and option: | |
| return f"{section} > {option}" | |
| if option: | |
| return option | |
| if section: | |
| return section | |
| return None | |
| # --------------------------- | |
| # Model: reward/quality (Transformers uyumlu) | |
| # --------------------------- | |
| MODEL_ID = "OpenAssistant/reward-model-deberta-v3-large-v2" | |
| try: | |
| quality_pipe = pipeline( | |
| task="text-classification", | |
| model=MODEL_ID, | |
| tokenizer=MODEL_ID, | |
| function_to_apply="none" # regression score | |
| ) | |
| MODEL_READY = True | |
| LOAD_ERR = "" | |
| except Exception as e: | |
| MODEL_READY = False | |
| LOAD_ERR = str(e) | |
| # --------------------------- | |
| # Scoring & labeling | |
| # --------------------------- | |
| def score_pair(question: str, answer: str) -> float: | |
| """Reward score (higher = better). If model not ready, use light heuristic.""" | |
| if not MODEL_READY: | |
| base = 0.3 | |
| if question.strip().endswith("?"): | |
| base += 0.1 | |
| if len(answer.split()) >= 6: | |
| base += 0.2 | |
| if answer.strip().endswith((".", "!", "?")): | |
| base += 0.1 | |
| return base | |
| text = f"Human: {question}\nAssistant: {answer}" | |
| out = quality_pipe(text, truncation=True)[0] # top_k=1 default | |
| return float(out["score"]) | |
| def label_mapper_from_distribution(scores: List[float]): | |
| """ | |
| Scores may be negative; use distribution-based thresholds: | |
| low : < 33rd percentile | |
| medium: 33–66 | |
| high : >= 66 | |
| """ | |
| if not scores: | |
| return lambda s: "medium" | |
| s_sorted = sorted(scores) | |
| def pct(p): | |
| if len(s_sorted) == 1: | |
| return s_sorted[0] | |
| idx = int(round((p/100) * (len(s_sorted)-1))) | |
| return s_sorted[idx] | |
| low_th = pct(33) | |
| high_th = pct(66) | |
| def mapper(s: float) -> str: | |
| if s >= high_th: | |
| return "high" | |
| elif s >= low_th: | |
| return "medium" | |
| else: | |
| return "low" | |
| return mapper | |
| # --------------------------- | |
| # Smart rewrite helpers | |
| # --------------------------- | |
| def _extract_module(item: Dict[str, Any], q_text: str) -> str | None: | |
| """ | |
| Extract uppercase 3+ letter 'module-like' token from question/context, e.g., CUSTOMERS. | |
| """ | |
| ctx = f"{item.get('context','')} {q_text}" | |
| m = re.search(r"\b([A-Z]{3,})\b", ctx) | |
| return m.group(1) if m else None | |
| def _roles_from_answer(ans: str) -> List[str]: | |
| """ | |
| Pull a role list from the answer; Title Case; drop empties. | |
| """ | |
| parts = re.split(r",| and ", ans or "", flags=re.IGNORECASE) | |
| roles = [] | |
| for p in parts: | |
| t = p.strip(" .") | |
| if not t: | |
| continue | |
| t = " ".join(w.capitalize() for w in t.split()) | |
| roles.append(t) | |
| return [r for r in roles if r] | |
| # --------------------------- | |
| # Rewriter (WHO/WHERE/WHAT/HOW aware) | |
| # --------------------------- | |
| def improve_smart(item: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| LLM-free safe rewrite: | |
| - WHO + MODULE: use roles pattern. (Only here!) | |
| - WHERE: produce menu-path sentence (e.g., Settings > Inventory Parameters). | |
| - WHAT: definition/purpose or 'allows searching by …' sentence. | |
| - HOW: short procedural sentence. | |
| - Else: grammar/format normalization. | |
| """ | |
| q = _normalize_ws(item.get("question") or "") | |
| a = _normalize_ws(item.get("answer") or "") | |
| meta = item.get("metadata") or {} | |
| qtype = (meta.get("question_type") or "").lower() | |
| orig_q = _normalize_ws(item.get("original_question") or "") | |
| orig_a = _normalize_ws(item.get("original_answer") or "") | |
| base = orig_a or a # prefer original to keep semantics | |
| module = _extract_module(item, q) # e.g., CUSTOMERS | |
| # -- WHO: ONLY here we use roles pattern | |
| if qtype == "who" and module: | |
| roles = _roles_from_answer(base) | |
| new_q = f"Which roles are authorized to access the {module} module in DealerTIQ?" | |
| if roles: | |
| if len(roles) == 1: | |
| roles_str = roles[0] | |
| elif len(roles) == 2: | |
| roles_str = " and ".join(roles) | |
| else: | |
| roles_str = ", ".join(roles[:-1]) + f", and {roles[-1]}" | |
| new_a = f"Authorized roles include {roles_str}." | |
| else: | |
| new_a = _sentence_case(base) if base else _sentence_case(a) | |
| if item.get("context"): | |
| item["context"] = f"DealerTIQ — {module} module" | |
| item["question"] = _sentence_case(new_q[:-1] + "?") | |
| item["answer"] = _sentence_case(new_a) | |
| return item | |
| # -- WHERE: menu path sentence | |
| if qtype == "where": | |
| text = base or a | |
| # "under the Settings section" | |
| m_sec = re.search(r"under the\s+([A-Za-z ]+?)\s+section", text or "", flags=re.IGNORECASE) | |
| section = m_sec.group(1).strip().title() if m_sec else None | |
| # quoted option: "Inventory Parameters" | |
| quotes = re.findall(r'"([^"]+)"', text or "") | |
| option = quotes[0].strip() if quotes else None | |
| path = _join_path(section, option) | |
| target = option or (module and f"{module} module") or "page" | |
| new_q = f"Where is the {target} located in DealerTIQ?" | |
| new_a = f"It is located under {path} in the left navigation menu." if path else (text or "It is available in the left navigation menu.") | |
| item["question"] = _sentence_case(new_q[:-1] + "?") | |
| item["answer"] = _sentence_case(new_a) | |
| return item | |
| # -- WHAT: definition/purpose or 'allows searching by …' | |
| if qtype == "what": | |
| text = base or a | |
| if re.search(r"allows\s+search(ing)?\s+by", text or "", flags=re.IGNORECASE): | |
| m = re.search(r"such as\s+(.+)", text or "", flags=re.IGNORECASE) | |
| if m: | |
| feats = m.group(1).strip().rstrip(".") | |
| new_q = f"What can you search for in {module or 'this module'}?" | |
| new_a = f"It allows searching by criteria such as {feats}." | |
| else: | |
| new_q = orig_q or q or "What can you search for in this module?" | |
| new_a = text or "It allows searching by multiple criteria." | |
| else: | |
| if re.search(r"\b(configure|configuration|settings)\b", text or "", flags=re.IGNORECASE): | |
| target = module or "Inventory Parameters" | |
| new_q = f"What is configured in the {target}?" | |
| new_a = text or "It configures related settings and rules." | |
| else: | |
| new_q = orig_q or q or "What is the purpose of this module?" | |
| new_a = text or "It provides the core functionality for this area." | |
| item["question"] = _sentence_case(new_q[:-1] + "?") | |
| item["answer"] = _sentence_case(new_a) | |
| return item | |
| # -- HOW: short procedure | |
| if qtype == "how": | |
| text = base or a | |
| quotes = re.findall(r'"([^"]+)"', text or "") # "Add Channel", "Inventory Parameters" | |
| m_sec = re.search(r"under the\s+([A-Za-z ]+?)\s+section", text or "", flags=re.IGNORECASE) | |
| section = m_sec.group(1).strip().title() if m_sec else None | |
| path = _join_path(section, quotes[0] if quotes else None) | |
| new_q = orig_q or q or f"How do I perform this action in {module or 'the module'}?" | |
| steps = _normalize_ws(text or "") | |
| steps = re.sub(r"\bclick on\b", "select", steps, flags=re.IGNORECASE) | |
| if path and "left navigation" not in steps.lower(): | |
| steps = f"Go to {path} in the left navigation menu, then {steps[0].lower() + steps[1:]}" if steps else f"Go to {path} in the left navigation menu." | |
| item["question"] = _sentence_case(new_q[:-1] + "?") if not new_q.endswith("?") else _sentence_case(new_q) | |
| item["answer"] = _sentence_case(steps or "Follow the on-screen instructions to complete the action.") | |
| return item | |
| # -- Fallback: grammar/format normalization only | |
| if q and not q.endswith("?"): | |
| q += "?" | |
| item["question"] = _sentence_case(q) | |
| item["answer"] = _sentence_case(base or a) | |
| return item | |
| # --------------------------- | |
| # Pipeline | |
| # --------------------------- | |
| def process_json(file) -> Tuple[List[Dict[str, Any]], str, str, str]: | |
| """ | |
| Input: JSON (list or single object) | |
| Steps: | |
| 1) First scoring pass for all items | |
| 2) Label via distribution thresholds (low/medium/high) | |
| 3) Auto-rewrite items labeled 'low' (improve_smart) | |
| 4) Rescore & write quality_before / quality_after | |
| Output: | |
| - Summary (Dataframe) | |
| - Preview JSON (first 50) | |
| - Download JSON path | |
| - Warn/Info | |
| """ | |
| data = json.load(open(file.name)) | |
| items: List[Dict[str, Any]] = data if isinstance(data, list) else [data] | |
| # 1) First scoring pass | |
| first = [] | |
| scores = [] | |
| for raw in items: | |
| it = dict(raw) | |
| s = score_pair(it.get("question",""), it.get("answer","")) | |
| it["quality_before"] = {"score": round(s, 3)} | |
| first.append(it) | |
| scores.append(s) | |
| # 2) Dynamic label function | |
| to_label = label_mapper_from_distribution(scores) | |
| # 3) Label, rewrite if 'low', then rescore | |
| processed = [] | |
| for it in first: | |
| base_label = to_label(it["quality_before"]["score"]) | |
| it["quality_before"]["label"] = base_label | |
| if base_label == "low": | |
| it = improve_smart(it) | |
| s2 = score_pair(it.get("question",""), it.get("answer","")) | |
| it["quality_after"] = { | |
| "score": round(s2, 3), | |
| "label": to_label(s2) | |
| } | |
| processed.append(it) | |
| # Summary table | |
| summary = [] | |
| for idx, it in enumerate(processed): | |
| qb = it.get("quality_before", {}) | |
| qa = it.get("quality_after") | |
| summary.append({ | |
| "id": it.get("id", idx), | |
| "before_label": qb.get("label"), | |
| "before_score": qb.get("score"), | |
| "after_label": qa.get("label") if qa else None, | |
| "after_score": qa.get("score") if qa else None, | |
| "question_preview": (it.get("question") or "")[:120] | |
| }) | |
| # Downloadable JSON | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") | |
| json.dump(processed, tmp, indent=2, ensure_ascii=False) | |
| tmp.flush(); tmp.close() | |
| # Preview | |
| preview = json.dumps(processed[:50], indent=2, ensure_ascii=False) | |
| if len(processed) > 50: | |
| preview += "\n\n// NOTE: Showing first 50 items. Download full file below." | |
| warn = "" | |
| if not MODEL_READY: | |
| warn = f"Warning: model '{MODEL_ID}' could not be loaded; heuristic scoring used. Error: {LOAD_ERR}" | |
| return summary, preview, tmp.name, warn | |
| # --------------------------- | |
| # UI | |
| # --------------------------- | |
| with gr.Blocks(title="Q&A Quality Upgrader", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("## Q&A Quality Upgrader\nUpload your JSON. Low-quality items will be auto-rewritten and rescored.") | |
| # ---- Minimal sample accordion (show + download) ---- | |
| with gr.Accordion("Minimal sample JSON (only id, question, answer)", open=False): | |
| gr.Markdown("Upload a JSON **array of objects** with the following schema:") | |
| gr.Code(value=SAMPLE_JSON_MIN, language="json", lines=18, label="Minimal JSON example") | |
| sample_btn = gr.Button("Download minimal sample.json") | |
| sample_file = gr.File(label="minimal-sample.json") | |
| sample_btn.click(fn=download_minimal_sample_json, outputs=sample_file) | |
| # ---- Upload & Run ---- | |
| inp = gr.File(file_types=[".json"], label="Upload JSON (list of objects)") | |
| run = gr.Button("Run") | |
| with gr.Tab("Summary"): | |
| tbl = gr.Dataframe(headers=["id","before_label","before_score","after_label","after_score","question_preview"]) | |
| with gr.Tab("Preview JSON"): | |
| code = gr.Code(language="json", lines=34, label="Preview (first 50 items)") | |
| with gr.Tab("Download"): | |
| dfile = gr.File(label="Download full JSON") | |
| warnbox = gr.Markdown("") | |
| run.click(process_json, inputs=[inp], outputs=[tbl, code, dfile, warnbox]) | |
| if __name__ == "__main__": | |
| demo.launch() | |