Files
hello-world/scripts/ai_code_review.py
T
john a791e4c15c
hello-world/pipeline/head This commit looks good
ai/code-review AI review reported findings
hello-world/pipeline/pr-master This commit looks good
Update
2026-05-03 09:41:47 +01:00

273 lines
7.5 KiB
Python

#!/usr/bin/env python3
import json
import os
import subprocess
import sys
from typing import Optional
from urllib import error, parse, request
MAX_DIFF_CHARS = 12000
STATUS_CONTEXT = "ai/code-review"
def run_git_command(*args: str) -> str:
result = subprocess.run(
["git", *args],
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
def try_git_command(*args: str) -> Optional[str]:
try:
return run_git_command(*args)
except subprocess.CalledProcessError:
return None
def resolve_base_commit() -> Optional[str]:
for env_name in ("GIT_PREVIOUS_SUCCESSFUL_COMMIT", "GIT_PREVIOUS_COMMIT"):
value = os.getenv(env_name)
if value:
return value
change_target = os.getenv("CHANGE_TARGET")
if change_target:
base = try_git_command("merge-base", "HEAD", f"origin/{change_target}")
if base:
return base
return try_git_command("rev-parse", "HEAD~1")
def collect_diff() -> str:
base_commit = resolve_base_commit()
if base_commit:
diff = try_git_command("diff", f"{base_commit}..HEAD", "--")
if diff:
return diff
staged_diff = try_git_command("diff", "--cached", "--")
if staged_diff:
return staged_diff
working_tree_diff = try_git_command("diff", "--")
if working_tree_diff:
return working_tree_diff
return ""
def get_current_commit() -> Optional[str]:
return os.getenv("GIT_COMMIT") or try_git_command("rev-parse", "HEAD")
def parse_repo_from_remote() -> tuple[Optional[str], Optional[str]]:
remote_url = try_git_command("remote", "get-url", "origin")
if not remote_url:
return None, None
cleaned = remote_url.strip()
if cleaned.endswith(".git"):
cleaned = cleaned[:-4]
if cleaned.startswith("git@") and ":" in cleaned:
cleaned = cleaned.split(":", 1)[1]
elif "://" in cleaned:
parsed = parse.urlparse(cleaned)
cleaned = parsed.path.lstrip("/")
parts = [part for part in cleaned.split("/") if part]
if len(parts) < 2:
return None, None
return parts[-2], parts[-1]
def get_gitea_repo() -> tuple[Optional[str], Optional[str]]:
owner = os.getenv("GITEA_REPO_OWNER")
name = os.getenv("GITEA_REPO_NAME")
if owner and name:
return owner, name
return parse_repo_from_remote()
def post_gitea_json(api_path: str, payload: dict) -> None:
base_url = os.getenv("GITEA_URL")
token = os.getenv("GITEA_TOKEN")
if not base_url or not token:
return
url = f"{base_url.rstrip('/')}{api_path}"
data = json.dumps(payload).encode("utf-8")
req = request.Request(
url,
data=data,
headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
"Accept": "application/json",
},
method="POST",
)
try:
with request.urlopen(req) as response:
response.read()
except error.HTTPError as exc:
details = exc.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Gitea API request failed: HTTP {exc.code} {details}") from exc
except error.URLError as exc:
raise RuntimeError(f"Gitea API request failed: {exc.reason}") from exc
def publish_commit_status(state: str, description: str) -> None:
base_url = os.getenv("GITEA_URL")
token = os.getenv("GITEA_TOKEN")
if not base_url or not token:
return
owner, repo = get_gitea_repo()
commit = get_current_commit()
if not owner or not repo or not commit:
return
build_url = os.getenv("BUILD_URL")
payload = {
"state": state,
"context": STATUS_CONTEXT,
"description": description[:255],
}
if build_url:
payload["target_url"] = build_url
post_gitea_json(f"/api/v1/repos/{owner}/{repo}/statuses/{commit}", payload)
def publish_pr_comment(body: str) -> None:
base_url = os.getenv("GITEA_URL")
token = os.getenv("GITEA_TOKEN")
if not base_url or not token:
return
owner, repo = get_gitea_repo()
pr_number = os.getenv("GITEA_PR_NUMBER") or os.getenv("CHANGE_ID")
if not owner or not repo or not pr_number:
return
post_gitea_json(
f"/api/v1/repos/{owner}/{repo}/issues/{pr_number}/comments",
{"body": body},
)
def build_prompt(diff_text: str) -> str:
truncated_diff = diff_text[:MAX_DIFF_CHARS]
suffix = ""
if len(diff_text) > MAX_DIFF_CHARS:
suffix = "\n\nDiff was truncated to fit the token budget."
return (
"Review the following git diff. Focus on correctness, regressions, missing validation, "
"build/test issues, and security concerns. "
"Return either 'No issues found.' or a short flat list where each item includes severity, file, and issue.\n\n"
f"{truncated_diff}{suffix}"
)
def request_review(diff_text: str) -> str:
try:
from openai import OpenAI
except ImportError as exc:
raise RuntimeError(
"The 'openai' package is not installed. Install it with 'pip install openai'."
) from exc
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY is not set.")
model = os.getenv("CODEX_MODEL") or os.getenv("OPENAI_MODEL") or "gpt-4.1"
client = OpenAI(api_key=api_key)
response = client.responses.create(
model=model,
input=[
{
"role": "system",
"content": [
{
"type": "input_text",
"text": (
"You are a strict CI code reviewer. Be concise, concrete, and prioritize real defects."
),
}
],
},
{
"role": "user",
"content": [
{
"type": "input_text",
"text": build_prompt(diff_text),
}
],
},
],
)
return response.output_text.strip()
def classify_review(review: str) -> tuple[str, str]:
normalized = (review or "").strip().lower()
if normalized == "no issues found.":
return "success", "No issues found"
return "failure", "AI review reported findings"
def main() -> int:
diff_text = collect_diff()
if not diff_text:
message = "No git changes detected. Skipping AI review."
print(message)
publish_commit_status("success", "No changes to review")
return 0
try:
review = request_review(diff_text)
except Exception as exc:
try:
publish_commit_status("error", "AI review failed")
except Exception as gitea_exc:
print(f"Gitea reporting failed: {gitea_exc}", file=sys.stderr)
print(f"AI review failed: {exc}", file=sys.stderr)
return 1
print("AI review result:\n")
print(review or "No issues found.")
fail_on_findings = os.getenv("AI_REVIEW_FAIL_ON_FINDINGS", "false").lower() == "true"
state, description = classify_review(review)
try:
publish_commit_status(state, description)
publish_pr_comment(f"AI review result:\n\n{review or 'No issues found.'}")
except Exception as exc:
print(f"Gitea reporting failed: {exc}", file=sys.stderr)
no_findings = state == "success"
if fail_on_findings and not no_findings:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())