"""Adapter for camera stage.""" import time from typing import Any from engine.pipeline.core import DataType, PipelineContext, Stage class CameraStage(Stage): """Adapter wrapping Camera as a Stage.""" def __init__(self, camera, name: str = "vertical"): self._camera = camera self.name = name self.category = "camera" self.optional = True self._last_frame_time: float | None = None @property def stage_type(self) -> str: return "camera" @property def capabilities(self) -> set[str]: return {"camera"} @property def dependencies(self) -> set[str]: return {"render.output"} @property def inlet_types(self) -> set: return {DataType.TEXT_BUFFER} @property def outlet_types(self) -> set: return {DataType.TEXT_BUFFER} def process(self, data: Any, ctx: PipelineContext) -> Any: """Apply camera transformation to items.""" if data is None: return data current_time = time.perf_counter() dt = 0.0 if self._last_frame_time is not None: dt = current_time - self._last_frame_time self._camera.update(dt) self._last_frame_time = current_time ctx.set_state("camera_y", self._camera.y) ctx.set_state("camera_x", self._camera.x) if hasattr(self._camera, "apply"): viewport_width = ctx.params.viewport_width if ctx.params else 80 viewport_height = ctx.params.viewport_height if ctx.params else 24 return self._camera.apply(data, viewport_width, viewport_height) return data