277 lines
10 KiB
Python
277 lines
10 KiB
Python
# pyright: reportUnknownMemberType=false
|
|
import os
|
|
import unittest
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
import yaml
|
|
|
|
from ci.util.paths import PROJECT_ROOT
|
|
|
|
|
|
@dataclass
|
|
class WorkflowStep:
|
|
name: Optional[str] = None
|
|
uses: Optional[str] = None
|
|
with_config: Optional[Dict[str, Any]] = None
|
|
run: Optional[str] = None
|
|
|
|
|
|
@dataclass
|
|
class WorkflowJob:
|
|
runs_on: Optional[Union[str, List[str]]] = None
|
|
steps: Optional[List[WorkflowStep]] = None
|
|
permissions: Optional[Dict[str, str]] = None
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.steps is None:
|
|
self.steps = []
|
|
|
|
|
|
@dataclass
|
|
class WorkflowTrigger:
|
|
branches: Optional[List[str]] = None
|
|
paths: Optional[List[str]] = None
|
|
|
|
|
|
@dataclass
|
|
class GitHubWorkflow:
|
|
name: Optional[str] = None
|
|
on_config: Optional[Dict[str, Any]] = None
|
|
jobs: Optional[Dict[str, WorkflowJob]] = None
|
|
permissions: Optional[Dict[str, str]] = None
|
|
|
|
def __post_init__(self) -> None:
|
|
if self.jobs is None:
|
|
self.jobs = {}
|
|
if self.on_config is None:
|
|
self.on_config = {}
|
|
|
|
|
|
class TestGitHubActionsSecurityTest(unittest.TestCase):
|
|
"""
|
|
Security tests for GitHub Actions workflows to prevent known vulnerabilities.
|
|
|
|
This test ensures that workflows using pull_request_target have proper
|
|
permissions restrictions to prevent the "pwn request" vulnerability:
|
|
https://securitylab.github.com/resources/github-actions-preventing-pwn-requests/
|
|
"""
|
|
|
|
def setUp(self):
|
|
self.workflows_dir = PROJECT_ROOT / ".github" / "workflows"
|
|
self.workflow_files = list(self.workflows_dir.glob("*.yml")) + list(
|
|
self.workflows_dir.glob("*.yaml")
|
|
)
|
|
|
|
def _load_workflow(self, workflow_path: Path) -> GitHubWorkflow:
|
|
"""Load and parse a GitHub Actions workflow file into a dataclass."""
|
|
try:
|
|
with open(workflow_path, "r", encoding="utf-8") as f:
|
|
raw_content: Any = yaml.safe_load(f)
|
|
|
|
content: Dict[str, Any] = raw_content or {}
|
|
|
|
# Handle the case where 'on' is parsed as boolean True instead of string 'on'
|
|
# This happens because 'on' is a YAML boolean keyword
|
|
if True in content and "on" not in content:
|
|
on_data = content.pop(True) # pyright: ignore[reportArgumentType]
|
|
content["on"] = on_data
|
|
|
|
# Parse jobs into dataclass
|
|
jobs_dict: Dict[str, WorkflowJob] = {}
|
|
raw_jobs: Dict[str, Any] = content.get("jobs", {})
|
|
for job_name, job_data in raw_jobs.items():
|
|
if not isinstance(job_data, dict):
|
|
continue
|
|
|
|
# Parse steps
|
|
steps: List[WorkflowStep] = []
|
|
raw_steps: List[Any] = job_data.get("steps", []) # pyright: ignore[reportUnknownVariableType]
|
|
for step_data in raw_steps: # pyright: ignore[reportUnknownVariableType]
|
|
if isinstance(step_data, dict):
|
|
step_dict: Dict[str, Any] = step_data # pyright: ignore[reportUnknownVariableType]
|
|
step = WorkflowStep(
|
|
name=step_dict.get("name"),
|
|
uses=step_dict.get("uses"),
|
|
with_config=step_dict.get("with"),
|
|
run=step_dict.get("run"),
|
|
)
|
|
steps.append(step)
|
|
|
|
job_dict: Dict[str, Any] = job_data # pyright: ignore[reportUnknownVariableType]
|
|
job = WorkflowJob(
|
|
runs_on=job_dict.get("runs-on"),
|
|
steps=steps,
|
|
permissions=job_dict.get("permissions"),
|
|
)
|
|
jobs_dict[job_name] = job
|
|
|
|
return GitHubWorkflow(
|
|
name=content.get("name"),
|
|
on_config=content.get("on", {}),
|
|
jobs=jobs_dict,
|
|
permissions=content.get("permissions"),
|
|
)
|
|
except Exception as e:
|
|
self.fail(f"Failed to parse workflow {workflow_path}: {e}")
|
|
|
|
def _has_pull_request_target(self, workflow: GitHubWorkflow) -> bool:
|
|
"""Check if workflow uses pull_request_target trigger."""
|
|
if isinstance(workflow.on_config, list):
|
|
return "pull_request_target" in workflow.on_config
|
|
elif isinstance(workflow.on_config, dict):
|
|
return "pull_request_target" in workflow.on_config
|
|
return False
|
|
|
|
def _has_untrusted_code_checkout(self, workflow: GitHubWorkflow) -> bool:
|
|
"""Check if workflow checks out PR code that could be untrusted."""
|
|
if workflow.jobs is None:
|
|
return False
|
|
|
|
for job in workflow.jobs.values():
|
|
if job.steps is None:
|
|
continue
|
|
|
|
for step in job.steps:
|
|
# Check for actions/checkout with PR head reference
|
|
if step.uses and step.uses.startswith("actions/checkout"):
|
|
if step.with_config:
|
|
ref = step.with_config.get("ref", "")
|
|
if "pull_request.head" in ref:
|
|
return True
|
|
return False
|
|
|
|
def _has_explicit_permissions(self, workflow: GitHubWorkflow) -> bool:
|
|
"""Check if workflow has explicit permissions set."""
|
|
# Check workflow-level permissions
|
|
if workflow.permissions:
|
|
return True
|
|
|
|
# Check job-level permissions
|
|
if workflow.jobs is not None:
|
|
for job in workflow.jobs.values():
|
|
if job.permissions:
|
|
return True
|
|
return False
|
|
|
|
def _get_permissions(self, workflow: GitHubWorkflow) -> Dict[str, Any]:
|
|
"""Get the permissions configuration from workflow."""
|
|
permissions: Dict[str, Any] = {}
|
|
|
|
# Workflow-level permissions
|
|
if workflow.permissions:
|
|
permissions.update(workflow.permissions)
|
|
|
|
# Job-level permissions (overwrites workflow-level)
|
|
if workflow.jobs is not None:
|
|
for job in workflow.jobs.values():
|
|
if job.permissions:
|
|
permissions.update(job.permissions)
|
|
|
|
return permissions
|
|
|
|
def _is_safe_permissions(self, permissions: Dict[str, Any]) -> bool:
|
|
"""Check if permissions are safe (no dangerous write access)."""
|
|
# List of dangerous write permissions
|
|
dangerous_write_permissions = [
|
|
"contents", # Can modify repository contents
|
|
"metadata", # Can modify repository metadata
|
|
"packages", # Can publish packages
|
|
"pages", # Can deploy to GitHub Pages
|
|
"deployments", # Can create deployments
|
|
"security-events", # Can create security events
|
|
]
|
|
|
|
for perm, value in permissions.items():
|
|
if perm in dangerous_write_permissions and value == "write":
|
|
return False
|
|
|
|
return True
|
|
|
|
def test_pull_request_target_workflows_have_safe_permissions(self) -> None:
|
|
"""
|
|
Test that all workflows using pull_request_target have explicit
|
|
safe permissions to prevent pwn request vulnerabilities.
|
|
"""
|
|
vulnerable_workflows: List[str] = []
|
|
unsafe_permission_workflows: List[str] = []
|
|
|
|
for workflow_path in self.workflow_files:
|
|
workflow = self._load_workflow(workflow_path)
|
|
|
|
if not self._has_pull_request_target(workflow):
|
|
continue
|
|
|
|
workflow_name = workflow_path.name
|
|
|
|
# Check if it has untrusted code checkout (potential vulnerability)
|
|
if self._has_untrusted_code_checkout(workflow):
|
|
if not self._has_explicit_permissions(workflow):
|
|
vulnerable_workflows.append(workflow_name)
|
|
else:
|
|
permissions = self._get_permissions(workflow)
|
|
if not self._is_safe_permissions(permissions):
|
|
unsafe_permission_workflows.append(
|
|
f"{workflow_name}: {permissions}"
|
|
)
|
|
|
|
# Report findings
|
|
error_messages: List[str] = []
|
|
|
|
if vulnerable_workflows:
|
|
error_messages.append(
|
|
f"CRITICAL: Found {len(vulnerable_workflows)} workflows with pull_request_target "
|
|
f"that checkout untrusted code without explicit permissions:\n"
|
|
+ "\n".join(f" - {w}" for w in vulnerable_workflows)
|
|
+ "\n\nThis is a critical security vulnerability! These workflows can be exploited "
|
|
"to gain repository write access through malicious PRs."
|
|
)
|
|
|
|
if unsafe_permission_workflows:
|
|
error_messages.append(
|
|
f"UNSAFE: Found {len(unsafe_permission_workflows)} workflows with pull_request_target "
|
|
f"that have dangerous write permissions:\n"
|
|
+ "\n".join(f" - {w}" for w in unsafe_permission_workflows)
|
|
+ "\n\nThese workflows should use minimal read-only permissions."
|
|
)
|
|
|
|
if error_messages:
|
|
self.fail(
|
|
"\n\n".join(error_messages)
|
|
+ "\n\nRecommended fix: Add explicit minimal permissions to these workflows:\n"
|
|
"permissions:\n"
|
|
" contents: read\n"
|
|
" actions: read\n"
|
|
" id-token: write\n"
|
|
" pull-requests: read"
|
|
)
|
|
|
|
def test_no_workflow_uses_excessive_permissions(self) -> None:
|
|
"""
|
|
Test that no workflow uses overly broad permissions like 'write-all'.
|
|
"""
|
|
excessive_permission_workflows: List[str] = []
|
|
|
|
for workflow_path in self.workflow_files:
|
|
workflow = self._load_workflow(workflow_path)
|
|
permissions = self._get_permissions(workflow)
|
|
|
|
# Check for dangerous permission patterns
|
|
for perm, value in permissions.items():
|
|
if value in ["write-all", "admin"]:
|
|
excessive_permission_workflows.append(
|
|
f"{workflow_path.name}: {perm}={value}"
|
|
)
|
|
|
|
if excessive_permission_workflows:
|
|
self.fail(
|
|
f"Found workflows with excessive permissions:\n"
|
|
+ "\n".join(f" - {w}" for w in excessive_permission_workflows)
|
|
+ "\n\nUse minimal required permissions instead."
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|