diff --git a/scripts/ai_code_review.py b/scripts/ai_code_review.py new file mode 100644 index 0000000..fb395db --- /dev/null +++ b/scripts/ai_code_review.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 + +import json +import os +import subprocess +import sys +from typing import Optional +from urllib import error, parse, request + + +MAX_DIFF_CHARS = 12000 +STATUS_CONTEXT = "ai/code-review" + + +def run_git_command(*args: str) -> str: + result = subprocess.run( + ["git", *args], + check=True, + capture_output=True, + text=True, + ) + return result.stdout.strip() + + +def try_git_command(*args: str) -> Optional[str]: + try: + return run_git_command(*args) + except subprocess.CalledProcessError: + return None + + +def resolve_base_commit() -> Optional[str]: + for env_name in ("GIT_PREVIOUS_SUCCESSFUL_COMMIT", "GIT_PREVIOUS_COMMIT"): + value = os.getenv(env_name) + if value: + return value + + change_target = os.getenv("CHANGE_TARGET") + if change_target: + base = try_git_command("merge-base", "HEAD", f"origin/{change_target}") + if base: + return base + + return try_git_command("rev-parse", "HEAD~1") + + +def collect_diff() -> str: + base_commit = resolve_base_commit() + if base_commit: + diff = try_git_command("diff", f"{base_commit}..HEAD", "--") + if diff: + return diff + + staged_diff = try_git_command("diff", "--cached", "--") + if staged_diff: + return staged_diff + + working_tree_diff = try_git_command("diff", "--") + if working_tree_diff: + return working_tree_diff + + return "" + + +def get_current_commit() -> Optional[str]: + return os.getenv("GIT_COMMIT") or try_git_command("rev-parse", "HEAD") + + +def parse_repo_from_remote() -> tuple[Optional[str], Optional[str]]: + remote_url = try_git_command("remote", "get-url", "origin") + if not remote_url: + return None, None + + cleaned = remote_url.strip() + if cleaned.endswith(".git"): + cleaned = cleaned[:-4] + + if cleaned.startswith("git@") and ":" in cleaned: + cleaned = cleaned.split(":", 1)[1] + elif "://" in cleaned: + parsed = parse.urlparse(cleaned) + cleaned = parsed.path.lstrip("/") + + parts = [part for part in cleaned.split("/") if part] + if len(parts) < 2: + return None, None + + return parts[-2], parts[-1] + + +def get_gitea_repo() -> tuple[Optional[str], Optional[str]]: + owner = os.getenv("GITEA_REPO_OWNER") + name = os.getenv("GITEA_REPO_NAME") + if owner and name: + return owner, name + + return parse_repo_from_remote() + + +def post_gitea_json(api_path: str, payload: dict) -> None: + base_url = os.getenv("GITEA_URL") + token = os.getenv("GITEA_TOKEN") + if not base_url or not token: + return + + url = f"{base_url.rstrip('/')}{api_path}" + data = json.dumps(payload).encode("utf-8") + req = request.Request( + url, + data=data, + headers={ + "Authorization": f"token {token}", + "Content-Type": "application/json", + "Accept": "application/json", + }, + method="POST", + ) + + try: + with request.urlopen(req) as response: + response.read() + except error.HTTPError as exc: + details = exc.read().decode("utf-8", errors="replace") + raise RuntimeError(f"Gitea API request failed: HTTP {exc.code} {details}") from exc + except error.URLError as exc: + raise RuntimeError(f"Gitea API request failed: {exc.reason}") from exc + + +def publish_commit_status(state: str, description: str) -> None: + base_url = os.getenv("GITEA_URL") + token = os.getenv("GITEA_TOKEN") + if not base_url or not token: + return + + owner, repo = get_gitea_repo() + commit = get_current_commit() + if not owner or not repo or not commit: + return + + build_url = os.getenv("BUILD_URL") + payload = { + "state": state, + "context": STATUS_CONTEXT, + "description": description[:255], + } + if build_url: + payload["target_url"] = build_url + + post_gitea_json(f"/api/v1/repos/{owner}/{repo}/statuses/{commit}", payload) + + +def publish_pr_comment(body: str) -> None: + base_url = os.getenv("GITEA_URL") + token = os.getenv("GITEA_TOKEN") + if not base_url or not token: + return + + owner, repo = get_gitea_repo() + pr_number = os.getenv("GITEA_PR_NUMBER") or os.getenv("CHANGE_ID") + if not owner or not repo or not pr_number: + return + + post_gitea_json( + f"/api/v1/repos/{owner}/{repo}/issues/{pr_number}/comments", + {"body": body}, + ) + + +def build_prompt(diff_text: str) -> str: + truncated_diff = diff_text[:MAX_DIFF_CHARS] + suffix = "" + if len(diff_text) > MAX_DIFF_CHARS: + suffix = "\n\nDiff was truncated to fit the token budget." + + return ( + "Review the following git diff. Focus on correctness, regressions, missing validation, " + "build/test issues, and security concerns. " + "No need to comment on removed code unless it seems like it would cause a problem. " + "Do not review the scripts in the scripts directory, as they are not part of the main codebase. " + "Return either 'No issues found.' or a short flat list where each item includes severity, file, and issue.\n\n" + f"{truncated_diff}{suffix}" + ) + + +def request_review(diff_text: str) -> str: + try: + from openai import OpenAI + except ImportError as exc: + raise RuntimeError( + "The 'openai' package is not installed. Install it with 'pip install openai'." + ) from exc + + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + raise RuntimeError("OPENAI_API_KEY is not set.") + + model = os.getenv("CODEX_MODEL") or os.getenv("OPENAI_MODEL") or "gpt-4.1" + client = OpenAI(api_key=api_key) + + response = client.responses.create( + model=model, + input=[ + { + "role": "system", + "content": [ + { + "type": "input_text", + "text": ( + "You are a strict CI code reviewer. Be concise, concrete, and prioritize real defects." + ), + } + ], + }, + { + "role": "user", + "content": [ + { + "type": "input_text", + "text": build_prompt(diff_text), + } + ], + }, + ], + ) + + return response.output_text.strip() + + +def classify_review(review: str) -> tuple[str, str]: + normalized = (review or "").strip().lower() + if normalized == "no issues found.": + return "success", "No issues found" + return "failure", "AI review reported findings" + + +def main() -> int: + diff_text = collect_diff() + if not diff_text: + message = "No git changes detected. Skipping AI review." + print(message) + publish_commit_status("success", "No changes to review") + return 0 + + try: + review = request_review(diff_text) + except Exception as exc: + try: + publish_commit_status("error", "AI review failed") + except Exception as gitea_exc: + print(f"Gitea reporting failed: {gitea_exc}", file=sys.stderr) + print(f"AI review failed: {exc}", file=sys.stderr) + return 1 + + print("AI review result:\n") + print(review or "No issues found.") + + fail_on_findings = os.getenv("AI_REVIEW_FAIL_ON_FINDINGS", "false").lower() == "true" + state, description = classify_review(review) + + try: + publish_commit_status(state, description) + publish_pr_comment(f"AI review result:\n\n{review or 'No issues found.'}") + except Exception as exc: + print(f"Gitea reporting failed: {exc}", file=sys.stderr) + + no_findings = state == "success" + + if fail_on_findings and not no_findings: + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) \ No newline at end of file