feature/capability-based-deps (#53)

Reviewed-on: #53
Co-authored-by: David Gwilliam <dhgwilliam@gmail.com>
Co-committed-by: David Gwilliam <dhgwilliam@gmail.com>
This commit was merged in pull request #53.
This commit is contained in:
2026-03-31 01:55:21 +00:00
committed by david
parent 2650f7245e
commit 2d28e92594
190 changed files with 37860 additions and 2026 deletions

201
scripts/capture_output.py Normal file
View File

@@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""
Capture output utility for Mainline.
This script captures the output of a Mainline pipeline using NullDisplay
and saves it to a JSON file for comparison with other branches.
"""
import argparse
import json
import time
from pathlib import Path
from engine.display import DisplayRegistry
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
from engine.pipeline.adapters import create_stage_from_display
from engine.pipeline.presets import get_preset
def capture_pipeline_output(
preset_name: str,
output_file: str,
frames: int = 60,
width: int = 80,
height: int = 24,
):
"""Capture pipeline output for a given preset.
Args:
preset_name: Name of preset to use
output_file: Path to save captured output
frames: Number of frames to capture
width: Terminal width
height: Terminal height
"""
print(f"Capturing output for preset '{preset_name}'...")
# Get preset
preset = get_preset(preset_name)
if not preset:
print(f"Error: Preset '{preset_name}' not found")
return False
# Create NullDisplay with recording
display = DisplayRegistry.create("null")
display.init(width, height)
display.start_recording()
# Build pipeline
config = PipelineConfig(
source=preset.source,
display="null", # Use null display
camera=preset.camera,
effects=preset.effects,
enable_metrics=False,
)
# Create pipeline context with params
from engine.pipeline.params import PipelineParams
params = PipelineParams(
source=preset.source,
display="null",
camera_mode=preset.camera,
effect_order=preset.effects,
viewport_width=preset.viewport_width,
viewport_height=preset.viewport_height,
camera_speed=preset.camera_speed,
)
ctx = PipelineContext()
ctx.params = params
pipeline = Pipeline(config=config, context=ctx)
# Add stages based on preset
from engine.data_sources.sources import HeadlinesDataSource
from engine.pipeline.adapters import DataSourceStage
# Add source stage
source = HeadlinesDataSource()
pipeline.add_stage("source", DataSourceStage(source, name="headlines"))
# Add message overlay if enabled
if getattr(preset, "enable_message_overlay", False):
from engine import config as engine_config
from engine.pipeline.adapters import MessageOverlayConfig, MessageOverlayStage
overlay_config = MessageOverlayConfig(
enabled=True,
display_secs=getattr(engine_config, "MESSAGE_DISPLAY_SECS", 30),
topic_url=getattr(engine_config, "NTFY_TOPIC", None),
)
pipeline.add_stage(
"message_overlay", MessageOverlayStage(config=overlay_config)
)
# Add display stage
pipeline.add_stage("display", create_stage_from_display(display, "null"))
# Build and initialize
pipeline.build()
if not pipeline.initialize():
print("Error: Failed to initialize pipeline")
return False
# Capture frames
print(f"Capturing {frames} frames...")
start_time = time.time()
for frame in range(frames):
try:
pipeline.execute([])
if frame % 10 == 0:
print(f" Frame {frame}/{frames}")
except Exception as e:
print(f"Error on frame {frame}: {e}")
break
elapsed = time.time() - start_time
print(f"Captured {frame + 1} frames in {elapsed:.2f}s")
# Get captured frames
captured_frames = display.get_frames()
print(f"Retrieved {len(captured_frames)} frames from display")
# Save to JSON
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
recording_data = {
"version": 1,
"preset": preset_name,
"display": "null",
"width": width,
"height": height,
"frame_count": len(captured_frames),
"frames": [
{
"frame_number": i,
"buffer": frame,
"width": width,
"height": height,
}
for i, frame in enumerate(captured_frames)
],
}
with open(output_path, "w") as f:
json.dump(recording_data, f, indent=2)
print(f"Saved recording to {output_path}")
return True
def main():
parser = argparse.ArgumentParser(description="Capture Mainline pipeline output")
parser.add_argument(
"--preset",
default="demo",
help="Preset name to use (default: demo)",
)
parser.add_argument(
"--output",
default="output/capture.json",
help="Output file path (default: output/capture.json)",
)
parser.add_argument(
"--frames",
type=int,
default=60,
help="Number of frames to capture (default: 60)",
)
parser.add_argument(
"--width",
type=int,
default=80,
help="Terminal width (default: 80)",
)
parser.add_argument(
"--height",
type=int,
default=24,
help="Terminal height (default: 24)",
)
args = parser.parse_args()
success = capture_pipeline_output(
preset_name=args.preset,
output_file=args.output,
frames=args.frames,
width=args.width,
height=args.height,
)
return 0 if success else 1
if __name__ == "__main__":
exit(main())

186
scripts/capture_upstream.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
Capture output from upstream/main branch.
This script captures the output of upstream/main Mainline using NullDisplay
and saves it to a JSON file for comparison with sideline branch.
"""
import argparse
import json
import sys
from pathlib import Path
# Add upstream/main to path
sys.path.insert(0, "/tmp/upstream_mainline")
def capture_upstream_output(
output_file: str,
frames: int = 60,
width: int = 80,
height: int = 24,
):
"""Capture upstream/main output.
Args:
output_file: Path to save captured output
frames: Number of frames to capture
width: Terminal width
height: Terminal height
"""
print(f"Capturing upstream/main output...")
try:
# Import upstream modules
from engine import config, themes
from engine.display import NullDisplay
from engine.fetch import fetch_all, load_cache
from engine.scroll import stream
from engine.ntfy import NtfyPoller
from engine.mic import MicMonitor
except ImportError as e:
print(f"Error importing upstream modules: {e}")
print("Make sure upstream/main is in the Python path")
return False
# Create a custom NullDisplay that captures frames
class CapturingNullDisplay:
def __init__(self, width, height, max_frames):
self.width = width
self.height = height
self.max_frames = max_frames
self.frame_count = 0
self.frames = []
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
def show(self, buffer: list[str], border: bool = False) -> None:
if self.frame_count < self.max_frames:
self.frames.append(list(buffer))
self.frame_count += 1
if self.frame_count >= self.max_frames:
raise StopIteration("Frame limit reached")
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass
def get_frames(self):
return self.frames
display = CapturingNullDisplay(width, height, frames)
# Load items (use cached headlines)
items = load_cache()
if not items:
print("No cached items found, fetching...")
result = fetch_all()
if isinstance(result, tuple):
items, linked, failed = result
else:
items = result
if not items:
print("Error: No items available")
return False
print(f"Loaded {len(items)} items")
# Create ntfy poller and mic monitor (upstream uses these)
ntfy_poller = NtfyPoller(config.NTFY_TOPIC, reconnect_delay=5, display_secs=30)
mic_monitor = MicMonitor()
# Run stream for specified number of frames
print(f"Capturing {frames} frames...")
try:
# Run the stream
stream(
items=items,
ntfy_poller=ntfy_poller,
mic_monitor=mic_monitor,
display=display,
)
except StopIteration:
print("Frame limit reached")
except Exception as e:
print(f"Error during capture: {e}")
# Continue to save what we have
# Get captured frames
captured_frames = display.get_frames()
print(f"Retrieved {len(captured_frames)} frames from display")
# Save to JSON
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
recording_data = {
"version": 1,
"preset": "upstream_demo",
"display": "null",
"width": width,
"height": height,
"frame_count": len(captured_frames),
"frames": [
{
"frame_number": i,
"buffer": frame,
"width": width,
"height": height,
}
for i, frame in enumerate(captured_frames)
],
}
with open(output_path, "w") as f:
json.dump(recording_data, f, indent=2)
print(f"Saved recording to {output_path}")
return True
def main():
parser = argparse.ArgumentParser(description="Capture upstream/main output")
parser.add_argument(
"--output",
default="output/upstream_demo.json",
help="Output file path (default: output/upstream_demo.json)",
)
parser.add_argument(
"--frames",
type=int,
default=60,
help="Number of frames to capture (default: 60)",
)
parser.add_argument(
"--width",
type=int,
default=80,
help="Terminal width (default: 80)",
)
parser.add_argument(
"--height",
type=int,
default=24,
help="Terminal height (default: 24)",
)
args = parser.parse_args()
success = capture_upstream_output(
output_file=args.output,
frames=args.frames,
width=args.width,
height=args.height,
)
return 0 if success else 1
if __name__ == "__main__":
exit(main())

View File

@@ -0,0 +1,144 @@
"""Capture frames from upstream Mainline for comparison testing.
This script should be run on the upstream/main branch to capture frames
that will later be compared with sideline branch output.
Usage:
# On upstream/main branch
python scripts/capture_upstream_comparison.py --preset demo
# This will create tests/comparison_output/demo_upstream.json
"""
import argparse
import json
import sys
from pathlib import Path
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent.parent))
def load_preset(preset_name: str) -> dict:
"""Load a preset from presets.toml."""
import tomli
# Try user presets first
user_presets = Path.home() / ".config" / "mainline" / "presets.toml"
local_presets = Path("presets.toml")
built_in_presets = Path(__file__).parent.parent / "presets.toml"
for preset_file in [user_presets, local_presets, built_in_presets]:
if preset_file.exists():
with open(preset_file, "rb") as f:
config = tomli.load(f)
if "presets" in config and preset_name in config["presets"]:
return config["presets"][preset_name]
raise ValueError(f"Preset '{preset_name}' not found")
def capture_upstream_frames(
preset_name: str,
frame_count: int = 30,
output_dir: Path = Path("tests/comparison_output"),
) -> Path:
"""Capture frames from upstream pipeline.
Note: This is a simplified version that mimics upstream behavior.
For actual upstream comparison, you may need to:
1. Checkout upstream/main branch
2. Run this script
3. Copy the output file
4. Checkout your branch
5. Run comparison
"""
output_dir.mkdir(parents=True, exist_ok=True)
# Load preset
preset = load_preset(preset_name)
# For upstream, we need to use the old monolithic rendering approach
# This is a simplified placeholder - actual implementation depends on
# the specific upstream architecture
print(f"Capturing {frame_count} frames from upstream preset '{preset_name}'")
print("Note: This script should be run on upstream/main branch")
print(f" for accurate comparison with sideline branch")
# Placeholder: In a real implementation, this would:
# 1. Import upstream-specific modules
# 2. Create pipeline using upstream architecture
# 3. Capture frames
# 4. Save to JSON
# For now, create a placeholder file with instructions
placeholder_data = {
"preset": preset_name,
"config": preset,
"note": "This is a placeholder file.",
"instructions": [
"1. Checkout upstream/main branch: git checkout main",
"2. Run frame capture: python scripts/capture_upstream_comparison.py --preset <name>",
"3. Copy output file to sideline branch",
"4. Checkout sideline branch: git checkout feature/capability-based-deps",
"5. Run comparison: python tests/run_comparison.py --preset <name>",
],
"frames": [], # Empty until properly captured
}
output_file = output_dir / f"{preset_name}_upstream.json"
with open(output_file, "w") as f:
json.dump(placeholder_data, f, indent=2)
print(f"\nPlaceholder file created: {output_file}")
print("\nTo capture actual upstream frames:")
print("1. Ensure you are on upstream/main branch")
print("2. This script needs to be adapted to use upstream-specific rendering")
print("3. The captured frames will be used for comparison with sideline")
return output_file
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description="Capture frames from upstream Mainline for comparison"
)
parser.add_argument(
"--preset",
"-p",
required=True,
help="Preset name to capture",
)
parser.add_argument(
"--frames",
"-f",
type=int,
default=30,
help="Number of frames to capture",
)
parser.add_argument(
"--output-dir",
"-o",
type=Path,
default=Path("tests/comparison_output"),
help="Output directory",
)
args = parser.parse_args()
try:
output_file = capture_upstream_frames(
preset_name=args.preset,
frame_count=args.frames,
output_dir=args.output_dir,
)
print(f"\nCapture complete: {output_file}")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

220
scripts/compare_outputs.py Normal file
View File

@@ -0,0 +1,220 @@
#!/usr/bin/env python3
"""
Compare captured outputs from different branches or configurations.
This script loads two captured recordings and compares them frame-by-frame,
reporting any differences found.
"""
import argparse
import difflib
import json
from pathlib import Path
def load_recording(file_path: str) -> dict:
"""Load a recording from a JSON file."""
with open(file_path, "r") as f:
return json.load(f)
def compare_frame_buffers(buf1: list[str], buf2: list[str]) -> tuple[int, list[str]]:
"""Compare two frame buffers and return differences.
Returns:
tuple: (difference_count, list of difference descriptions)
"""
differences = []
# Check dimensions
if len(buf1) != len(buf2):
differences.append(f"Height mismatch: {len(buf1)} vs {len(buf2)}")
# Check each line
max_lines = max(len(buf1), len(buf2))
for i in range(max_lines):
if i >= len(buf1):
differences.append(f"Line {i}: Missing in first buffer")
continue
if i >= len(buf2):
differences.append(f"Line {i}: Missing in second buffer")
continue
line1 = buf1[i]
line2 = buf2[i]
if line1 != line2:
# Find the specific differences in the line
if len(line1) != len(line2):
differences.append(
f"Line {i}: Length mismatch ({len(line1)} vs {len(line2)})"
)
# Show a snippet of the difference
max_len = max(len(line1), len(line2))
snippet1 = line1[:50] + "..." if len(line1) > 50 else line1
snippet2 = line2[:50] + "..." if len(line2) > 50 else line2
differences.append(f"Line {i}: '{snippet1}' != '{snippet2}'")
return len(differences), differences
def compare_recordings(
recording1: dict, recording2: dict, max_frames: int = None
) -> dict:
"""Compare two recordings frame-by-frame.
Returns:
dict: Comparison results with summary and detailed differences
"""
results = {
"summary": {},
"frames": [],
"total_differences": 0,
"frames_with_differences": 0,
}
# Compare metadata
results["summary"]["recording1"] = {
"preset": recording1.get("preset", "unknown"),
"frame_count": recording1.get("frame_count", 0),
"width": recording1.get("width", 0),
"height": recording1.get("height", 0),
}
results["summary"]["recording2"] = {
"preset": recording2.get("preset", "unknown"),
"frame_count": recording2.get("frame_count", 0),
"width": recording2.get("width", 0),
"height": recording2.get("height", 0),
}
# Compare frames
frames1 = recording1.get("frames", [])
frames2 = recording2.get("frames", [])
num_frames = min(len(frames1), len(frames2))
if max_frames:
num_frames = min(num_frames, max_frames)
print(f"Comparing {num_frames} frames...")
for frame_idx in range(num_frames):
frame1 = frames1[frame_idx]
frame2 = frames2[frame_idx]
buf1 = frame1.get("buffer", [])
buf2 = frame2.get("buffer", [])
diff_count, differences = compare_frame_buffers(buf1, buf2)
if diff_count > 0:
results["total_differences"] += diff_count
results["frames_with_differences"] += 1
results["frames"].append(
{
"frame_number": frame_idx,
"differences": differences,
"diff_count": diff_count,
}
)
if frame_idx < 5: # Only print first 5 frames with differences
print(f"\nFrame {frame_idx} ({diff_count} differences):")
for diff in differences[:5]: # Limit to 5 differences per frame
print(f" - {diff}")
# Summary
results["summary"]["total_frames_compared"] = num_frames
results["summary"]["frames_with_differences"] = results["frames_with_differences"]
results["summary"]["total_differences"] = results["total_differences"]
results["summary"]["match_percentage"] = (
(1 - results["frames_with_differences"] / num_frames) * 100
if num_frames > 0
else 0
)
return results
def print_comparison_summary(results: dict):
"""Print a summary of the comparison results."""
print("\n" + "=" * 80)
print("COMPARISON SUMMARY")
print("=" * 80)
r1 = results["summary"]["recording1"]
r2 = results["summary"]["recording2"]
print(f"\nRecording 1: {r1['preset']}")
print(
f" Frames: {r1['frame_count']}, Width: {r1['width']}, Height: {r1['height']}"
)
print(f"\nRecording 2: {r2['preset']}")
print(
f" Frames: {r2['frame_count']}, Width: {r2['width']}, Height: {r2['height']}"
)
print(f"\nComparison:")
print(f" Frames compared: {results['summary']['total_frames_compared']}")
print(f" Frames with differences: {results['summary']['frames_with_differences']}")
print(f" Total differences: {results['summary']['total_differences']}")
print(f" Match percentage: {results['summary']['match_percentage']:.2f}%")
if results["summary"]["match_percentage"] == 100:
print("\n✓ Recordings match perfectly!")
else:
print("\n⚠ Recordings have differences.")
def main():
parser = argparse.ArgumentParser(
description="Compare captured outputs from different branches"
)
parser.add_argument(
"recording1",
help="First recording file (JSON)",
)
parser.add_argument(
"recording2",
help="Second recording file (JSON)",
)
parser.add_argument(
"--max-frames",
type=int,
help="Maximum number of frames to compare",
)
parser.add_argument(
"--output",
"-o",
help="Output file for detailed comparison results (JSON)",
)
args = parser.parse_args()
# Load recordings
print(f"Loading {args.recording1}...")
recording1 = load_recording(args.recording1)
print(f"Loading {args.recording2}...")
recording2 = load_recording(args.recording2)
# Compare
results = compare_recordings(recording1, recording2, args.max_frames)
# Print summary
print_comparison_summary(results)
# Save detailed results if requested
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
print(f"\nDetailed results saved to {args.output}")
return 0
if __name__ == "__main__":
exit(main())

151
scripts/demo-lfo-effects.py Normal file
View File

@@ -0,0 +1,151 @@
#!/usr/bin/env python3
"""
Pygame Demo: Effects with LFO Modulation
This demo shows how to use LFO (Low Frequency Oscillator) to modulate
effect intensities over time, creating smooth animated changes.
Effects modulated:
- noise: Random noise intensity
- fade: Fade effect intensity
- tint: Color tint intensity
- glitch: Glitch effect intensity
The LFO uses a sine wave to oscillate intensity between 0.0 and 1.0.
"""
import sys
import time
from dataclasses import dataclass
from typing import Any
from engine import config
from engine.display import DisplayRegistry
from engine.effects import get_registry
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext, list_presets
from engine.pipeline.params import PipelineParams
from engine.pipeline.preset_loader import load_presets
from engine.sensors.oscillator import OscillatorSensor
from engine.sources import FEEDS
@dataclass
class LFOEffectConfig:
"""Configuration for LFO-modulated effect."""
name: str
frequency: float # LFO frequency in Hz
phase_offset: float # Phase offset (0.0 to 1.0)
min_intensity: float = 0.0
max_intensity: float = 1.0
class LFOEffectDemo:
"""Demo controller that modulates effect intensities using LFO."""
def __init__(self, pipeline: Pipeline):
self.pipeline = pipeline
self.effects = [
LFOEffectConfig("noise", frequency=0.5, phase_offset=0.0),
LFOEffectConfig("fade", frequency=0.3, phase_offset=0.33),
LFOEffectConfig("tint", frequency=0.4, phase_offset=0.66),
LFOEffectConfig("glitch", frequency=0.6, phase_offset=0.9),
]
self.start_time = time.time()
self.frame_count = 0
def update(self):
"""Update effect intensities based on LFO."""
elapsed = time.time() - self.start_time
self.frame_count += 1
for effect_cfg in self.effects:
# Calculate LFO value using sine wave
angle = (
(elapsed * effect_cfg.frequency + effect_cfg.phase_offset) * 2 * 3.14159
)
lfo_value = 0.5 + 0.5 * (angle.__sin__())
# Scale to intensity range
intensity = effect_cfg.min_intensity + lfo_value * (
effect_cfg.max_intensity - effect_cfg.min_intensity
)
# Update effect intensity in pipeline
self.pipeline.set_effect_intensity(effect_cfg.name, intensity)
def run(self, duration: float = 30.0):
"""Run the demo for specified duration."""
print(f"\n{'=' * 60}")
print("LFO EFFECT MODULATION DEMO")
print(f"{'=' * 60}")
print("\nEffects being modulated:")
for effect in self.effects:
print(f" - {effect.name}: {effect.frequency}Hz")
print(f"\nPress Ctrl+C to stop")
print(f"{'=' * 60}\n")
start = time.time()
try:
while time.time() - start < duration:
self.update()
time.sleep(0.016) # ~60 FPS
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
print(f"\nTotal frames rendered: {self.frame_count}")
def main():
"""Main entry point for the LFO demo."""
# Configuration
effect_names = ["noise", "fade", "tint", "glitch"]
# Get pipeline config from preset
preset_name = "demo-pygame"
presets = load_presets()
preset = presets["presets"].get(preset_name)
if not preset:
print(f"Error: Preset '{preset_name}' not found")
print(f"Available presets: {list(presets['presets'].keys())}")
sys.exit(1)
# Create pipeline context
ctx = PipelineContext()
ctx.terminal_width = preset.get("viewport_width", 80)
ctx.terminal_height = preset.get("viewport_height", 24)
# Create params
params = PipelineParams(
source=preset.get("source", "headlines"),
display="pygame", # Force pygame display
camera_mode=preset.get("camera", "feed"),
effect_order=effect_names, # Enable our effects
viewport_width=preset.get("viewport_width", 80),
viewport_height=preset.get("viewport_height", 24),
)
ctx.params = params
# Create pipeline config
pipeline_config = PipelineConfig(
source=preset.get("source", "headlines"),
display="pygame",
camera=preset.get("camera", "feed"),
effects=effect_names,
)
# Create pipeline
pipeline = Pipeline(config=pipeline_config, context=ctx)
# Build pipeline
pipeline.build()
# Create demo controller
demo = LFOEffectDemo(pipeline)
# Run demo
demo.run(duration=30.0)
if __name__ == "__main__":
main()

222
scripts/demo_hot_rebuild.py Normal file
View File

@@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""
Demo script for testing pipeline hot-rebuild and state preservation.
Usage:
python scripts/demo_hot_rebuild.py
python scripts/demo_hot_rebuild.py --viewport 40x15
This script:
1. Creates a small viewport (40x15) for easier capture
2. Uses NullDisplay with recording enabled
3. Runs the pipeline for N frames (capturing initial state)
4. Triggers a "hot-rebuild" (e.g., toggling an effect stage)
5. Runs the pipeline for M more frames
6. Verifies state preservation by comparing frames before/after rebuild
7. Prints visual comparison to stdout
"""
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.display import DisplayRegistry
from engine.effects import get_registry
from engine.fetch import load_cache
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
from engine.pipeline.adapters import (
EffectPluginStage,
FontStage,
SourceItemsToBufferStage,
ViewportFilterStage,
create_stage_from_display,
create_stage_from_effect,
)
from engine.pipeline.params import PipelineParams
def run_demo(viewport_width: int = 40, viewport_height: int = 15):
"""Run the hot-rebuild demo."""
print(f"\n{'=' * 60}")
print(f"Pipeline Hot-Rebuild Demo")
print(f"Viewport: {viewport_width}x{viewport_height}")
print(f"{'=' * 60}\n")
import engine.effects.plugins as effects_plugins
effects_plugins.discover_plugins()
print("[1/6] Loading source items...")
items = load_cache()
if not items:
print(" ERROR: No fixture cache available")
sys.exit(1)
print(f" Loaded {len(items)} items")
print("[2/6] Creating NullDisplay with recording...")
display = DisplayRegistry.create("null")
display.init(viewport_width, viewport_height)
display.start_recording()
print(" Recording started")
print("[3/6] Building pipeline...")
params = PipelineParams()
params.viewport_width = viewport_width
params.viewport_height = viewport_height
config = PipelineConfig(
source="fixture",
display="null",
camera="scroll",
effects=["noise", "fade"],
)
pipeline = Pipeline(config=config, context=PipelineContext())
from engine.data_sources.sources import ListDataSource
from engine.pipeline.adapters import DataSourceStage
list_source = ListDataSource(items, name="fixture")
pipeline.add_stage("source", DataSourceStage(list_source, name="fixture"))
pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter"))
pipeline.add_stage("font", FontStage(name="font"))
effect_registry = get_registry()
for effect_name in config.effects:
effect = effect_registry.get(effect_name)
if effect:
pipeline.add_stage(
f"effect_{effect_name}",
create_stage_from_effect(effect, effect_name),
)
pipeline.add_stage("display", create_stage_from_display(display, "null"))
pipeline.build()
if not pipeline.initialize():
print(" ERROR: Failed to initialize pipeline")
sys.exit(1)
print(" Pipeline built and initialized")
ctx = pipeline.context
ctx.params = params
ctx.set("display", display)
ctx.set("items", items)
ctx.set("pipeline", pipeline)
ctx.set("pipeline_order", pipeline.execution_order)
ctx.set("camera_y", 0)
print("[4/6] Running pipeline for 10 frames (before rebuild)...")
frames_before = []
for frame in range(10):
params.frame_number = frame
ctx.params = params
result = pipeline.execute(items)
if result.success:
frames_before.append(display._last_buffer)
print(f" Captured {len(frames_before)} frames")
print("[5/6] Triggering hot-rebuild (toggling 'fade' effect)...")
fade_stage = pipeline.get_stage("effect_fade")
if fade_stage and isinstance(fade_stage, EffectPluginStage):
new_enabled = not fade_stage.is_enabled()
fade_stage.set_enabled(new_enabled)
fade_stage._effect.config.enabled = new_enabled
print(f" Fade effect enabled: {new_enabled}")
else:
print(" WARNING: Could not find fade effect stage")
print("[6/6] Running pipeline for 10 more frames (after rebuild)...")
frames_after = []
for frame in range(10, 20):
params.frame_number = frame
ctx.params = params
result = pipeline.execute(items)
if result.success:
frames_after.append(display._last_buffer)
print(f" Captured {len(frames_after)} frames")
display.stop_recording()
print("\n" + "=" * 60)
print("RESULTS")
print("=" * 60)
print("\n[State Preservation Check]")
if frames_before and frames_after:
last_before = frames_before[-1]
first_after = frames_after[0]
if last_before == first_after:
print(" PASS: Buffer state preserved across rebuild")
else:
print(" INFO: Buffer changed after rebuild (expected - effect toggled)")
print("\n[Frame Continuity Check]")
recorded_frames = display.get_frames()
print(f" Total recorded frames: {len(recorded_frames)}")
print(f" Frames before rebuild: {len(frames_before)}")
print(f" Frames after rebuild: {len(frames_after)}")
if len(recorded_frames) == 20:
print(" PASS: All frames recorded")
else:
print(" WARNING: Frame count mismatch")
print("\n[Visual Comparison - First frame before vs after rebuild]")
print("\n--- Before rebuild (frame 9) ---")
for i, line in enumerate(frames_before[0][:viewport_height]):
print(f"{i:2}: {line}")
print("\n--- After rebuild (frame 10) ---")
for i, line in enumerate(frames_after[0][:viewport_height]):
print(f"{i:2}: {line}")
print("\n[Recording Save/Load Test]")
test_file = Path("/tmp/test_recording.json")
display.save_recording(test_file)
print(f" Saved recording to: {test_file}")
display2 = DisplayRegistry.create("null")
display2.init(viewport_width, viewport_height)
display2.load_recording(test_file)
loaded_frames = display2.get_frames()
print(f" Loaded {len(loaded_frames)} frames from file")
if len(loaded_frames) == len(recorded_frames):
print(" PASS: Recording save/load works correctly")
else:
print(" WARNING: Frame count mismatch after load")
test_file.unlink(missing_ok=True)
pipeline.cleanup()
display.cleanup()
print("\n" + "=" * 60)
print("Demo complete!")
print("=" * 60 + "\n")
def main():
viewport_width = 40
viewport_height = 15
if "--viewport" in sys.argv:
idx = sys.argv.index("--viewport")
if idx + 1 < len(sys.argv):
vp = sys.argv[idx + 1]
try:
viewport_width, viewport_height = map(int, vp.split("x"))
except ValueError:
print("Error: Invalid viewport format. Use WxH (e.g., 40x15)")
sys.exit(1)
run_demo(viewport_width, viewport_height)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,378 @@
#!/usr/bin/env python3
"""
Oscilloscope with Image Data Source Integration
This demo:
1. Uses pygame to render oscillator waveforms
2. Converts to PIL Image (8-bit grayscale with transparency)
3. Renders to ANSI using image data source patterns
4. Features LFO modulation chain
Usage:
uv run python scripts/demo_image_oscilloscope.py --lfo --modulate
"""
import argparse
import sys
import time
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.data_sources.sources import DataSource, ImageItem
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
class ModulatedOscillator:
"""Oscillator with frequency modulation from another oscillator."""
def __init__(
self,
name: str,
waveform: str = "sine",
base_frequency: float = 1.0,
modulator: "OscillatorSensor | None" = None,
modulation_depth: float = 0.5,
):
self.name = name
self.waveform = waveform
self.base_frequency = base_frequency
self.modulator = modulator
self.modulation_depth = modulation_depth
register_oscillator_sensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc = OscillatorSensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc.start()
def read(self):
if self.modulator:
mod_reading = self.modulator.read()
if mod_reading:
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
effective_freq = self.base_frequency + mod_offset
effective_freq = max(0.1, min(effective_freq, 20.0))
self.osc._frequency = effective_freq
return self.osc.read()
def get_phase(self):
return self.osc._phase
def get_effective_frequency(self):
if self.modulator and self.modulator.read():
mod_reading = self.modulator.read()
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
return max(0.1, min(self.base_frequency + mod_offset, 20.0))
return self.base_frequency
def stop(self):
self.osc.stop()
class OscilloscopeDataSource(DataSource):
"""Dynamic data source that generates oscilloscope images from oscillators."""
def __init__(
self,
modulator: OscillatorSensor,
modulated: ModulatedOscillator,
width: int = 200,
height: int = 100,
):
self.modulator = modulator
self.modulated = modulated
self.width = width
self.height = height
self.frame = 0
# Check if pygame and PIL are available
import importlib.util
self.pygame_available = importlib.util.find_spec("pygame") is not None
self.pil_available = importlib.util.find_spec("PIL") is not None
@property
def name(self) -> str:
return "oscilloscope_image"
@property
def is_dynamic(self) -> bool:
return True
def fetch(self) -> list[ImageItem]:
"""Generate oscilloscope image from oscillators."""
if not self.pygame_available or not self.pil_available:
# Fallback to text-based source
return []
import pygame
from PIL import Image
# Create Pygame surface
surface = pygame.Surface((self.width, self.height))
surface.fill((10, 10, 20)) # Dark background
# Get readings
mod_reading = self.modulator.read()
mod_val = mod_reading.value if mod_reading else 0.5
modulated_reading = self.modulated.read()
modulated_val = modulated_reading.value if modulated_reading else 0.5
# Draw modulator waveform (top half)
top_height = self.height // 2
waveform_fn = self.modulator.WAVEFORMS[self.modulator.waveform]
mod_time_offset = self.modulator._phase * self.modulator.frequency * 0.3
prev_x, prev_y = 0, 0
for x in range(self.width):
col_fraction = x / self.width
time_pos = mod_time_offset + col_fraction
sample = waveform_fn(time_pos * self.modulator.frequency * 2)
y = int(top_height - (sample * (top_height - 10)) - 5)
if x > 0:
pygame.draw.line(surface, (100, 200, 255), (prev_x, prev_y), (x, y), 1)
prev_x, prev_y = x, y
# Draw separator
pygame.draw.line(
surface, (80, 80, 100), (0, top_height), (self.width, top_height), 1
)
# Draw modulated waveform (bottom half)
bottom_start = top_height + 1
bottom_height = self.height - bottom_start - 1
waveform_fn = self.modulated.osc.WAVEFORMS[self.modulated.waveform]
modulated_time_offset = (
self.modulated.get_phase() * self.modulated.get_effective_frequency() * 0.3
)
prev_x, prev_y = 0, 0
for x in range(self.width):
col_fraction = x / self.width
time_pos = modulated_time_offset + col_fraction
sample = waveform_fn(
time_pos * self.modulated.get_effective_frequency() * 2
)
y = int(
bottom_start + (bottom_height - (sample * (bottom_height - 10))) - 5
)
if x > 0:
pygame.draw.line(surface, (255, 150, 100), (prev_x, prev_y), (x, y), 1)
prev_x, prev_y = x, y
# Convert Pygame surface to PIL Image (8-bit grayscale with alpha)
img_str = pygame.image.tostring(surface, "RGB")
pil_rgb = Image.frombytes("RGB", (self.width, self.height), img_str)
# Convert to 8-bit grayscale
pil_gray = pil_rgb.convert("L")
# Create alpha channel (full opacity for now)
alpha = Image.new("L", (self.width, self.height), 255)
# Combine into RGBA
pil_rgba = Image.merge("RGBA", (pil_gray, pil_gray, pil_gray, alpha))
# Create ImageItem
item = ImageItem(
image=pil_rgba,
source="oscilloscope_image",
timestamp=str(time.time()),
path=None,
metadata={
"frame": self.frame,
"mod_value": mod_val,
"modulated_value": modulated_val,
},
)
self.frame += 1
return [item]
def render_pil_to_ansi(
pil_image, terminal_width: int = 80, terminal_height: int = 30
) -> str:
"""Convert PIL image (8-bit grayscale with transparency) to ANSI."""
# Resize for terminal display
resized = pil_image.resize((terminal_width * 2, terminal_height * 2))
# Extract grayscale and alpha channels
gray = resized.convert("L")
alpha = resized.split()[3] if len(resized.split()) > 3 else None
# ANSI character ramp (dark to light)
chars = " .:-=+*#%@"
lines = []
for y in range(0, resized.height, 2): # Sample every 2nd row for aspect ratio
line = ""
for x in range(0, resized.width, 2):
pixel = gray.getpixel((x, y))
# Check alpha if available
if alpha:
a = alpha.getpixel((x, y))
if a < 128: # Transparent
line += " "
continue
char_index = int((pixel / 255) * (len(chars) - 1))
line += chars[char_index]
lines.append(line)
return "\n".join(lines)
def demo_image_oscilloscope(
waveform: str = "sine",
base_freq: float = 0.5,
modulate: bool = False,
mod_waveform: str = "sine",
mod_freq: float = 0.5,
mod_depth: float = 0.5,
frames: int = 0,
):
"""Run oscilloscope with image data source integration."""
frame_interval = 1.0 / 15.0 # 15 FPS
print("Oscilloscope with Image Data Source Integration")
print("Frame rate: 15 FPS")
print()
# Create oscillators
modulator = OscillatorSensor(
name="modulator", waveform=mod_waveform, frequency=mod_freq
)
modulator.start()
modulated = ModulatedOscillator(
name="modulated",
waveform=waveform,
base_frequency=base_freq,
modulator=modulator if modulate else None,
modulation_depth=mod_depth,
)
# Create image data source
image_source = OscilloscopeDataSource(
modulator=modulator,
modulated=modulated,
width=200,
height=100,
)
# Run demo loop
try:
frame = 0
last_time = time.time()
while frames == 0 or frame < frames:
# Fetch image from data source
images = image_source.fetch()
if images:
# Convert to ANSI
visualization = render_pil_to_ansi(
images[0].image, terminal_width=80, terminal_height=30
)
else:
# Fallback to text message
visualization = (
"Pygame or PIL not available\n\n[Image rendering disabled]"
)
# Add header
header = f"IMAGE SOURCE MODE | Frame: {frame}"
header_line = "" * 80
visualization = f"{header}\n{header_line}\n" + visualization
# Display
print("\033[H" + visualization)
# Frame timing
elapsed = time.time() - last_time
sleep_time = max(0, frame_interval - elapsed)
time.sleep(sleep_time)
last_time = time.time()
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
modulator.stop()
modulated.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Oscilloscope with image data source integration"
)
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Main waveform type",
)
parser.add_argument(
"--frequency",
type=float,
default=0.5,
help="Main oscillator frequency",
)
parser.add_argument(
"--lfo",
action="store_true",
help="Use slow LFO frequency (0.5Hz)",
)
parser.add_argument(
"--modulate",
action="store_true",
help="Enable LFO modulation chain",
)
parser.add_argument(
"--mod-waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Modulator waveform type",
)
parser.add_argument(
"--mod-freq",
type=float,
default=0.5,
help="Modulator frequency in Hz",
)
parser.add_argument(
"--mod-depth",
type=float,
default=0.5,
help="Modulation depth",
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render",
)
args = parser.parse_args()
base_freq = args.frequency
if args.lfo:
base_freq = 0.5
demo_image_oscilloscope(
waveform=args.waveform,
base_freq=base_freq,
modulate=args.modulate,
mod_waveform=args.mod_waveform,
mod_freq=args.mod_freq,
mod_depth=args.mod_depth,
frames=args.frames,
)

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
Simple Oscillator Sensor Demo
This script demonstrates the oscillator sensor by:
1. Creating an oscillator sensor with various waveforms
2. Printing the waveform data in real-time
Usage:
uv run python scripts/demo_oscillator_simple.py --waveform sine --frequency 1.0
uv run python scripts/demo_oscillator_simple.py --waveform square --frequency 2.0
"""
import argparse
import math
import time
import sys
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
def render_waveform(width: int, height: int, osc: OscillatorSensor, frame: int) -> str:
"""Render a waveform visualization."""
# Get current reading
current_reading = osc.read()
current_value = current_reading.value if current_reading else 0.0
# Generate waveform data - sample the waveform function directly
# This shows what the waveform looks like, not the live reading
samples = []
waveform_fn = osc.WAVEFORMS[osc._waveform]
for i in range(width):
# Sample across one complete cycle (0 to 1)
phase = i / width
value = waveform_fn(phase)
samples.append(value)
# Build visualization
lines = []
# Header with sensor info
header = (
f"Oscillator: {osc.name} | Waveform: {osc.waveform} | Freq: {osc.frequency}Hz"
)
lines.append(header)
lines.append("" * width)
# Waveform plot (scaled to fit height)
num_rows = height - 3 # Header, separator, footer
for row in range(num_rows):
# Calculate the sample value that corresponds to this row
# 0.0 is bottom, 1.0 is top
row_value = 1.0 - (row / (num_rows - 1)) if num_rows > 1 else 0.5
line_chars = []
for x, sample in enumerate(samples):
# Determine if this sample should be drawn in this row
# Map sample (0.0-1.0) to row (0 to num_rows-1)
# 0.0 -> row 0 (bottom), 1.0 -> row num_rows-1 (top)
sample_row = int(sample * (num_rows - 1))
if sample_row == row:
# Use different characters for waveform vs current position marker
# Check if this is the current reading position
if abs(x / width - (osc._phase % 1.0)) < 0.02:
line_chars.append("") # Current position marker
else:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Footer with current value and phase info
footer = f"Value: {current_value:.3f} | Frame: {frame} | Phase: {osc._phase:.2f}"
lines.append(footer)
return "\n".join(lines)
def demo_oscillator(waveform: str = "sine", frequency: float = 1.0, frames: int = 0):
"""Run oscillator demo."""
print(f"Starting oscillator demo: {waveform} wave at {frequency}Hz")
if frames > 0:
print(f"Running for {frames} frames")
else:
print("Press Ctrl+C to stop")
print()
# Create oscillator sensor
register_oscillator_sensor(name="demo_osc", waveform=waveform, frequency=frequency)
osc = OscillatorSensor(name="demo_osc", waveform=waveform, frequency=frequency)
osc.start()
# Run demo loop
try:
frame = 0
while frames == 0 or frame < frames:
# Render waveform
visualization = render_waveform(80, 20, osc, frame)
# Print with ANSI escape codes to clear screen and move cursor
print("\033[H\033[J" + visualization)
time.sleep(0.05) # 20 FPS
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
osc.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Oscillator sensor demo")
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Waveform type",
)
parser.add_argument(
"--frequency", type=float, default=1.0, help="Oscillator frequency in Hz"
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render (0 = infinite until Ctrl+C)",
)
args = parser.parse_args()
demo_oscillator(args.waveform, args.frequency, args.frames)

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Oscilloscope Demo - Real-time waveform visualization
This demonstrates a real oscilloscope-style display where:
1. A complete waveform is drawn on the canvas
2. The camera scrolls horizontally (time axis)
3. The "pen" traces the waveform vertically at the center
Think of it as:
- Canvas: Contains the waveform pattern (like a stamp)
- Camera: Moves left-to-right, revealing different parts of the waveform
- Pen: Always at center X, moves vertically with the signal value
Usage:
uv run python scripts/demo_oscilloscope.py --frequency 1.0 --speed 10
"""
import argparse
import math
import time
import sys
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
def render_oscilloscope(
width: int,
height: int,
osc: OscillatorSensor,
frame: int,
) -> str:
"""Render an oscilloscope-style display."""
# Get current reading (0.0 to 1.0)
reading = osc.read()
current_value = reading.value if reading else 0.5
phase = osc._phase
frequency = osc.frequency
# Build visualization
lines = []
# Header with sensor info
header = (
f"Oscilloscope: {osc.name} | Wave: {osc.waveform} | "
f"Freq: {osc.frequency}Hz | Phase: {phase:.2f}"
)
lines.append(header)
lines.append("" * width)
# Center line (zero reference)
center_row = height // 2
# Draw oscilloscope trace
waveform_fn = osc.WAVEFORMS[osc._waveform]
# Calculate time offset for scrolling
# The trace scrolls based on phase - this creates the time axis movement
# At frequency 1.0, the trace completes one full sweep per frequency cycle
time_offset = phase * frequency * 2.0
# Pre-calculate all sample values for this frame
# Each column represents a time point on the X axis
samples = []
for col in range(width):
# Time position for this column (0.0 to 1.0 across width)
col_fraction = col / width
# Combine with time offset for scrolling effect
time_pos = time_offset + col_fraction
# Sample the waveform at this time point
# Multiply by frequency to get correct number of cycles shown
sample_value = waveform_fn(time_pos * frequency * 2)
samples.append(sample_value)
# Draw the trace
# For each row, check which columns have their sample value in this row
for row in range(height - 3): # Reserve 3 lines for header/footer
# Calculate vertical position (0.0 at bottom, 1.0 at top)
row_pos = 1.0 - (row / (height - 4))
line_chars = []
for col in range(width):
sample = samples[col]
# Check if this sample falls in this row
tolerance = 1.0 / (height - 4)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Draw center indicator line
center_line = list(" " * width)
# Position the indicator based on current value
indicator_x = int((current_value) * (width - 1))
if 0 <= indicator_x < width:
center_line[indicator_x] = ""
lines.append("".join(center_line))
# Footer with current value
footer = f"Value: {current_value:.3f} | Frame: {frame} | Phase: {phase:.2f}"
lines.append(footer)
return "\n".join(lines)
def demo_oscilloscope(
waveform: str = "sine",
frequency: float = 1.0,
frames: int = 0,
):
"""Run oscilloscope demo."""
# Determine if this is LFO range
is_lfo = frequency <= 20.0 and frequency >= 0.1
freq_type = "LFO" if is_lfo else "Audio"
print(f"Oscilloscope demo: {waveform} wave")
print(f"Frequency: {frequency}Hz ({freq_type} range)")
if frames > 0:
print(f"Running for {frames} frames")
else:
print("Press Ctrl+C to stop")
print()
# Create oscillator sensor
register_oscillator_sensor(
name="oscilloscope_osc", waveform=waveform, frequency=frequency
)
osc = OscillatorSensor(
name="oscilloscope_osc", waveform=waveform, frequency=frequency
)
osc.start()
# Run demo loop
try:
frame = 0
while frames == 0 or frame < frames:
# Render oscilloscope display
visualization = render_oscilloscope(80, 22, osc, frame)
# Print with ANSI escape codes to clear screen and move cursor
print("\033[H\033[J" + visualization)
time.sleep(1.0 / 60.0) # 60 FPS
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
osc.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Oscilloscope demo")
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Waveform type",
)
parser.add_argument(
"--frequency",
type=float,
default=1.0,
help="Oscillator frequency in Hz (LFO: 0.1-20Hz, Audio: >20Hz)",
)
parser.add_argument(
"--lfo",
action="store_true",
help="Use LFO frequency (0.5Hz - slow modulation)",
)
parser.add_argument(
"--fast-lfo",
action="store_true",
help="Use fast LFO frequency (5Hz - rhythmic modulation)",
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render (0 = infinite until Ctrl+C)",
)
args = parser.parse_args()
# Determine frequency based on mode
frequency = args.frequency
if args.lfo:
frequency = 0.5 # Slow LFO for modulation
elif args.fast_lfo:
frequency = 5.0 # Fast LFO for rhythmic modulation
demo_oscilloscope(
waveform=args.waveform,
frequency=frequency,
frames=args.frames,
)

View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
Enhanced Oscilloscope with LFO Modulation Chain
This demo features:
1. Slower frame rate (15 FPS) for human appreciation
2. Reduced flicker using cursor positioning
3. LFO modulation chain: LFO1 modulates LFO2 frequency
4. Multiple visualization modes
Usage:
# Simple LFO
uv run python scripts/demo_oscilloscope_mod.py --lfo
# LFO modulation chain: LFO1 modulates LFO2 frequency
uv run python scripts/demo_oscilloscope_mod.py --modulate --lfo
# Custom modulation depth and rate
uv run python scripts/demo_oscilloscope_mod.py --modulate --lfo --mod-depth 0.5 --mod-rate 0.25
"""
import argparse
import sys
import time
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
class ModulatedOscillator:
"""
Oscillator with frequency modulation from another oscillator.
Frequency = base_frequency + (modulator_value * modulation_depth)
"""
def __init__(
self,
name: str,
waveform: str = "sine",
base_frequency: float = 1.0,
modulator: "OscillatorSensor | None" = None,
modulation_depth: float = 0.5,
):
self.name = name
self.waveform = waveform
self.base_frequency = base_frequency
self.modulator = modulator
self.modulation_depth = modulation_depth
# Create the oscillator sensor
register_oscillator_sensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc = OscillatorSensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc.start()
def read(self):
"""Read current value, applying modulation if present."""
# Update frequency based on modulator
if self.modulator:
mod_reading = self.modulator.read()
if mod_reading:
# Modulator value (0-1) affects frequency
# Map 0-1 to -modulation_depth to +modulation_depth
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
effective_freq = self.base_frequency + mod_offset
# Clamp to reasonable range
effective_freq = max(0.1, min(effective_freq, 20.0))
self.osc._frequency = effective_freq
return self.osc.read()
def get_phase(self):
"""Get current phase."""
return self.osc._phase
def get_effective_frequency(self):
"""Get current effective frequency (after modulation)."""
if self.modulator and self.modulator.read():
mod_reading = self.modulator.read()
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
return max(0.1, min(self.base_frequency + mod_offset, 20.0))
return self.base_frequency
def stop(self):
"""Stop the oscillator."""
self.osc.stop()
def render_dual_waveform(
width: int,
height: int,
modulator: OscillatorSensor,
modulated: ModulatedOscillator,
frame: int,
) -> str:
"""Render both modulator and modulated waveforms."""
# Get readings
mod_reading = modulator.read()
mod_val = mod_reading.value if mod_reading else 0.5
modulated_reading = modulated.read()
modulated_val = modulated_reading.value if modulated_reading else 0.5
# Build visualization
lines = []
# Header with sensor info
header1 = f"MODULATOR: {modulator.name} | Wave: {modulator.waveform} | Freq: {modulator.frequency:.2f}Hz"
header2 = f"MODULATED: {modulated.name} | Wave: {modulated.waveform} | Base: {modulated.base_frequency:.2f}Hz | Eff: {modulated.get_effective_frequency():.2f}Hz"
lines.append(header1)
lines.append(header2)
lines.append("" * width)
# Render modulator waveform (top half)
top_height = (height - 5) // 2
waveform_fn = modulator.WAVEFORMS[modulator.waveform]
# Calculate time offset for scrolling
mod_time_offset = modulator._phase * modulator.frequency * 0.3
for row in range(top_height):
row_pos = 1.0 - (row / (top_height - 1))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = mod_time_offset + col_fraction
sample = waveform_fn(time_pos * modulator.frequency * 2)
tolerance = 1.0 / (top_height - 1)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Separator line with modulation info
lines.append(
f"─ MODULATION: depth={modulated.modulation_depth:.2f} | mod_value={mod_val:.2f}"
)
# Render modulated waveform (bottom half)
bottom_height = height - top_height - 5
waveform_fn = modulated.osc.WAVEFORMS[modulated.waveform]
# Calculate time offset for scrolling
modulated_time_offset = (
modulated.get_phase() * modulated.get_effective_frequency() * 0.3
)
for row in range(bottom_height):
row_pos = 1.0 - (row / (bottom_height - 1))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = modulated_time_offset + col_fraction
sample = waveform_fn(time_pos * modulated.get_effective_frequency() * 2)
tolerance = 1.0 / (bottom_height - 1)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Footer with current values
footer = f"Mod Value: {mod_val:.3f} | Modulated Value: {modulated_val:.3f} | Frame: {frame}"
lines.append(footer)
return "\n".join(lines)
def render_single_waveform(
width: int,
height: int,
osc: OscillatorSensor,
frame: int,
) -> str:
"""Render a single waveform (for non-modulated mode)."""
reading = osc.read()
current_value = reading.value if reading else 0.5
phase = osc._phase
frequency = osc.frequency
# Build visualization
lines = []
# Header with sensor info
header = (
f"Oscilloscope: {osc.name} | Wave: {osc.waveform} | "
f"Freq: {frequency:.2f}Hz | Phase: {phase:.2f}"
)
lines.append(header)
lines.append("" * width)
# Draw oscilloscope trace
waveform_fn = osc.WAVEFORMS[osc.waveform]
time_offset = phase * frequency * 0.3
for row in range(height - 3):
row_pos = 1.0 - (row / (height - 4))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = time_offset + col_fraction
sample = waveform_fn(time_pos * frequency * 2)
tolerance = 1.0 / (height - 4)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Footer
footer = f"Value: {current_value:.3f} | Frame: {frame} | Phase: {phase:.2f}"
lines.append(footer)
return "\n".join(lines)
def demo_oscilloscope_mod(
waveform: str = "sine",
base_freq: float = 1.0,
modulate: bool = False,
mod_waveform: str = "sine",
mod_freq: float = 0.5,
mod_depth: float = 0.5,
frames: int = 0,
):
"""Run enhanced oscilloscope demo with modulation support."""
# Frame timing for smooth 15 FPS
frame_interval = 1.0 / 15.0 # 66.67ms per frame
print("Enhanced Oscilloscope Demo")
print("Frame rate: 15 FPS (66ms per frame)")
if modulate:
print(
f"Modulation: {mod_waveform} @ {mod_freq}Hz -> {waveform} @ {base_freq}Hz"
)
print(f"Modulation depth: {mod_depth}")
else:
print(f"Waveform: {waveform} @ {base_freq}Hz")
if frames > 0:
print(f"Running for {frames} frames")
else:
print("Press Ctrl+C to stop")
print()
# Create oscillators
if modulate:
# Create modulation chain: modulator -> modulated
modulator = OscillatorSensor(
name="modulator", waveform=mod_waveform, frequency=mod_freq
)
modulator.start()
modulated = ModulatedOscillator(
name="modulated",
waveform=waveform,
base_frequency=base_freq,
modulator=modulator,
modulation_depth=mod_depth,
)
else:
# Single oscillator
register_oscillator_sensor(
name="oscilloscope", waveform=waveform, frequency=base_freq
)
osc = OscillatorSensor(
name="oscilloscope", waveform=waveform, frequency=base_freq
)
osc.start()
# Run demo loop with consistent timing
try:
frame = 0
last_time = time.time()
while frames == 0 or frame < frames:
# Render based on mode
if modulate:
visualization = render_dual_waveform(
80, 30, modulator, modulated, frame
)
else:
visualization = render_single_waveform(80, 22, osc, frame)
# Use cursor positioning instead of full clear to reduce flicker
print("\033[H" + visualization)
# Calculate sleep time for consistent 15 FPS
elapsed = time.time() - last_time
sleep_time = max(0, frame_interval - elapsed)
time.sleep(sleep_time)
last_time = time.time()
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
if modulate:
modulator.stop()
modulated.stop()
else:
osc.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Enhanced oscilloscope with LFO modulation chain"
)
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Main waveform type",
)
parser.add_argument(
"--frequency",
type=float,
default=1.0,
help="Main oscillator frequency (LFO range: 0.1-20Hz)",
)
parser.add_argument(
"--lfo",
action="store_true",
help="Use slow LFO frequency (0.5Hz) for main oscillator",
)
parser.add_argument(
"--modulate",
action="store_true",
help="Enable LFO modulation chain (modulator modulates main oscillator)",
)
parser.add_argument(
"--mod-waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Modulator waveform type",
)
parser.add_argument(
"--mod-freq",
type=float,
default=0.5,
help="Modulator frequency in Hz",
)
parser.add_argument(
"--mod-depth",
type=float,
default=0.5,
help="Modulation depth (0.0-1.0, higher = more frequency variation)",
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render (0 = infinite until Ctrl+C)",
)
args = parser.parse_args()
# Set frequency based on LFO flag
base_freq = args.frequency
if args.lfo:
base_freq = 0.5
demo_oscilloscope_mod(
waveform=args.waveform,
base_freq=base_freq,
modulate=args.modulate,
mod_waveform=args.mod_waveform,
mod_freq=args.mod_freq,
mod_depth=args.mod_depth,
frames=args.frames,
)

View File

@@ -0,0 +1,411 @@
#!/usr/bin/env python3
"""
Enhanced Oscilloscope with Pipeline Switching
This demo features:
1. Text-based oscilloscope (first 15 seconds)
2. Pygame renderer with PIL to ANSI conversion (next 15 seconds)
3. Continuous looping between the two modes
Usage:
uv run python scripts/demo_oscilloscope_pipeline.py --lfo --modulate
"""
import argparse
import sys
import time
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
class ModulatedOscillator:
"""Oscillator with frequency modulation from another oscillator."""
def __init__(
self,
name: str,
waveform: str = "sine",
base_frequency: float = 1.0,
modulator: "OscillatorSensor | None" = None,
modulation_depth: float = 0.5,
):
self.name = name
self.waveform = waveform
self.base_frequency = base_frequency
self.modulator = modulator
self.modulation_depth = modulation_depth
register_oscillator_sensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc = OscillatorSensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc.start()
def read(self):
"""Read current value, applying modulation if present."""
if self.modulator:
mod_reading = self.modulator.read()
if mod_reading:
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
effective_freq = self.base_frequency + mod_offset
effective_freq = max(0.1, min(effective_freq, 20.0))
self.osc._frequency = effective_freq
return self.osc.read()
def get_phase(self):
return self.osc._phase
def get_effective_frequency(self):
if self.modulator:
mod_reading = self.modulator.read()
if mod_reading:
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
return max(0.1, min(self.base_frequency + mod_offset, 20.0))
return self.base_frequency
def stop(self):
self.osc.stop()
def render_text_mode(
width: int,
height: int,
modulator: OscillatorSensor,
modulated: ModulatedOscillator,
frame: int,
) -> str:
"""Render dual waveforms in text mode."""
mod_reading = modulator.read()
mod_val = mod_reading.value if mod_reading else 0.5
modulated_reading = modulated.read()
modulated_val = modulated_reading.value if modulated_reading else 0.5
lines = []
header1 = (
f"TEXT MODE | MODULATOR: {modulator.waveform} @ {modulator.frequency:.2f}Hz"
)
header2 = (
f"MODULATED: {modulated.waveform} @ {modulated.get_effective_frequency():.2f}Hz"
)
lines.append(header1)
lines.append(header2)
lines.append("" * width)
# Modulator waveform (top half)
top_height = (height - 5) // 2
waveform_fn = modulator.WAVEFORMS[modulator.waveform]
mod_time_offset = modulator._phase * modulator.frequency * 0.3
for row in range(top_height):
row_pos = 1.0 - (row / (top_height - 1))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = mod_time_offset + col_fraction
sample = waveform_fn(time_pos * modulator.frequency * 2)
tolerance = 1.0 / (top_height - 1)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
lines.append(
f"─ MODULATION: depth={modulated.modulation_depth:.2f} | mod_value={mod_val:.2f}"
)
# Modulated waveform (bottom half)
bottom_height = height - top_height - 5
waveform_fn = modulated.osc.WAVEFORMS[modulated.waveform]
modulated_time_offset = (
modulated.get_phase() * modulated.get_effective_frequency() * 0.3
)
for row in range(bottom_height):
row_pos = 1.0 - (row / (bottom_height - 1))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = modulated_time_offset + col_fraction
sample = waveform_fn(time_pos * modulated.get_effective_frequency() * 2)
tolerance = 1.0 / (bottom_height - 1)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
footer = (
f"Mod Value: {mod_val:.3f} | Modulated: {modulated_val:.3f} | Frame: {frame}"
)
lines.append(footer)
return "\n".join(lines)
def render_pygame_to_ansi(
width: int,
height: int,
modulator: OscillatorSensor,
modulated: ModulatedOscillator,
frame: int,
font_path: str | None,
) -> str:
"""Render waveforms using Pygame, convert to ANSI with PIL."""
try:
import pygame
from PIL import Image
except ImportError:
return "Pygame or PIL not available\n\n" + render_text_mode(
width, height, modulator, modulated, frame
)
# Initialize Pygame surface (smaller for ANSI conversion)
pygame_width = width * 2 # Double for better quality
pygame_height = height * 4
surface = pygame.Surface((pygame_width, pygame_height))
surface.fill((10, 10, 20)) # Dark background
# Get readings
mod_reading = modulator.read()
mod_val = mod_reading.value if mod_reading else 0.5
modulated_reading = modulated.read()
modulated_val = modulated_reading.value if modulated_reading else 0.5
# Draw modulator waveform (top half)
top_height = pygame_height // 2
waveform_fn = modulator.WAVEFORMS[modulator.waveform]
mod_time_offset = modulator._phase * modulator.frequency * 0.3
prev_x, prev_y = 0, 0
for x in range(pygame_width):
col_fraction = x / pygame_width
time_pos = mod_time_offset + col_fraction
sample = waveform_fn(time_pos * modulator.frequency * 2)
y = int(top_height - (sample * (top_height - 20)) - 10)
if x > 0:
pygame.draw.line(surface, (100, 200, 255), (prev_x, prev_y), (x, y), 2)
prev_x, prev_y = x, y
# Draw separator
pygame.draw.line(
surface, (80, 80, 100), (0, top_height), (pygame_width, top_height), 1
)
# Draw modulated waveform (bottom half)
bottom_start = top_height + 10
bottom_height = pygame_height - bottom_start - 20
waveform_fn = modulated.osc.WAVEFORMS[modulated.waveform]
modulated_time_offset = (
modulated.get_phase() * modulated.get_effective_frequency() * 0.3
)
prev_x, prev_y = 0, 0
for x in range(pygame_width):
col_fraction = x / pygame_width
time_pos = modulated_time_offset + col_fraction
sample = waveform_fn(time_pos * modulated.get_effective_frequency() * 2)
y = int(bottom_start + (bottom_height - (sample * (bottom_height - 20))) - 10)
if x > 0:
pygame.draw.line(surface, (255, 150, 100), (prev_x, prev_y), (x, y), 2)
prev_x, prev_y = x, y
# Draw info text on pygame surface
try:
if font_path:
font = pygame.font.Font(font_path, 16)
info_text = f"PYGAME MODE | Mod: {mod_val:.2f} | Out: {modulated_val:.2f} | Frame: {frame}"
text_surface = font.render(info_text, True, (200, 200, 200))
surface.blit(text_surface, (10, 10))
except Exception:
pass
# Convert Pygame surface to PIL Image
img_str = pygame.image.tostring(surface, "RGB")
pil_image = Image.frombytes("RGB", (pygame_width, pygame_height), img_str)
# Convert to ANSI
return pil_to_ansi(pil_image)
def pil_to_ansi(image) -> str:
"""Convert PIL image to ANSI escape codes."""
# Resize for terminal display
terminal_width = 80
terminal_height = 30
image = image.resize((terminal_width * 2, terminal_height * 2))
# Convert to grayscale
image = image.convert("L")
# ANSI character ramp (dark to light)
chars = " .:-=+*#%@"
lines = []
for y in range(0, image.height, 2): # Sample every 2nd row for aspect ratio
line = ""
for x in range(0, image.width, 2):
pixel = image.getpixel((x, y))
char_index = int((pixel / 255) * (len(chars) - 1))
line += chars[char_index]
lines.append(line)
# Add header info
header = "PYGAME → ANSI RENDER MODE"
header_line = "" * terminal_width
return f"{header}\n{header_line}\n" + "\n".join(lines)
def demo_with_pipeline_switching(
waveform: str = "sine",
base_freq: float = 0.5,
modulate: bool = False,
mod_waveform: str = "sine",
mod_freq: float = 0.5,
mod_depth: float = 0.5,
frames: int = 0,
):
"""Run demo with pipeline switching every 15 seconds."""
frame_interval = 1.0 / 15.0 # 15 FPS
mode_duration = 15.0 # 15 seconds per mode
print("Enhanced Oscilloscope with Pipeline Switching")
print(f"Mode duration: {mode_duration} seconds")
print("Frame rate: 15 FPS")
print()
# Create oscillators
modulator = OscillatorSensor(
name="modulator", waveform=mod_waveform, frequency=mod_freq
)
modulator.start()
modulated = ModulatedOscillator(
name="modulated",
waveform=waveform,
base_frequency=base_freq,
modulator=modulator if modulate else None,
modulation_depth=mod_depth,
)
# Find font path
font_path = Path("fonts/Pixel_Sparta.otf")
if not font_path.exists():
font_path = Path("fonts/Pixel Sparta.otf")
font_path = str(font_path) if font_path.exists() else None
# Run demo loop
try:
frame = 0
mode_start_time = time.time()
mode_index = 0 # 0 = text, 1 = pygame
while frames == 0 or frame < frames:
elapsed = time.time() - mode_start_time
# Switch mode every 15 seconds
if elapsed >= mode_duration:
mode_index = (mode_index + 1) % 2
mode_start_time = time.time()
print(f"\n{'=' * 60}")
print(
f"SWITCHING TO {'PYGAME+ANSI' if mode_index == 1 else 'TEXT'} MODE"
)
print(f"{'=' * 60}\n")
time.sleep(1.0) # Brief pause to show mode switch
# Render based on mode
if mode_index == 0:
# Text mode
visualization = render_text_mode(80, 30, modulator, modulated, frame)
else:
# Pygame + PIL to ANSI mode
visualization = render_pygame_to_ansi(
80, 30, modulator, modulated, frame, font_path
)
# Display with cursor positioning
print("\033[H" + visualization)
# Frame timing
time.sleep(frame_interval)
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
modulator.stop()
modulated.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Enhanced oscilloscope with pipeline switching"
)
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Main waveform type",
)
parser.add_argument(
"--frequency",
type=float,
default=0.5,
help="Main oscillator frequency (LFO range)",
)
parser.add_argument(
"--lfo",
action="store_true",
help="Use slow LFO frequency (0.5Hz)",
)
parser.add_argument(
"--modulate",
action="store_true",
help="Enable LFO modulation chain",
)
parser.add_argument(
"--mod-waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Modulator waveform type",
)
parser.add_argument(
"--mod-freq",
type=float,
default=0.5,
help="Modulator frequency in Hz",
)
parser.add_argument(
"--mod-depth",
type=float,
default=0.5,
help="Modulation depth",
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render (0 = infinite)",
)
args = parser.parse_args()
base_freq = args.frequency
if args.lfo:
base_freq = 0.5
demo_with_pipeline_switching(
waveform=args.waveform,
base_freq=base_freq,
modulate=args.modulate,
mod_waveform=args.mod_waveform,
mod_freq=args.mod_freq,
mod_depth=args.mod_depth,
)

View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""
Oscillator Data Export
Exports oscillator sensor data in JSON format for external use.
Usage:
uv run python scripts/oscillator_data_export.py --waveform sine --frequency 1.0 --duration 5.0
"""
import argparse
import json
import time
import sys
from pathlib import Path
from datetime import datetime
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
def export_oscillator_data(
waveform: str = "sine",
frequency: float = 1.0,
duration: float = 5.0,
sample_rate: float = 60.0,
output_file: str | None = None,
):
"""Export oscillator data to JSON."""
print(f"Exporting oscillator data: {waveform} wave at {frequency}Hz")
print(f"Duration: {duration}s, Sample rate: {sample_rate}Hz")
# Create oscillator sensor
register_oscillator_sensor(
name="export_osc", waveform=waveform, frequency=frequency
)
osc = OscillatorSensor(name="export_osc", waveform=waveform, frequency=frequency)
osc.start()
# Collect data
data = {
"waveform": waveform,
"frequency": frequency,
"duration": duration,
"sample_rate": sample_rate,
"timestamp": datetime.now().isoformat(),
"samples": [],
}
sample_interval = 1.0 / sample_rate
num_samples = int(duration * sample_rate)
print(f"Collecting {num_samples} samples...")
for i in range(num_samples):
reading = osc.read()
if reading:
data["samples"].append(
{
"index": i,
"timestamp": reading.timestamp,
"value": reading.value,
"phase": osc._phase,
}
)
time.sleep(sample_interval)
osc.stop()
# Export to JSON
if output_file:
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
print(f"Data exported to {output_file}")
else:
print(json.dumps(data, indent=2))
return data
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Export oscillator sensor data")
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Waveform type",
)
parser.add_argument(
"--frequency", type=float, default=1.0, help="Oscillator frequency in Hz"
)
parser.add_argument(
"--duration", type=float, default=5.0, help="Duration to record in seconds"
)
parser.add_argument(
"--sample-rate", type=float, default=60.0, help="Sample rate in Hz"
)
parser.add_argument(
"--output", "-o", type=str, help="Output JSON file (default: print to stdout)"
)
args = parser.parse_args()
export_oscillator_data(
waveform=args.waveform,
frequency=args.frequency,
duration=args.duration,
sample_rate=args.sample_rate,
output_file=args.output,
)

509
scripts/pipeline_demo.py Normal file
View File

@@ -0,0 +1,509 @@
#!/usr/bin/env python3
"""
Pipeline Demo Orchestrator
Demonstrates all effects and camera modes with gentle oscillation.
Runs a comprehensive test of the Mainline pipeline system with proper
frame rate control and extended duration for visibility.
"""
import argparse
import math
import signal
import sys
import time
from typing import Any
from engine.camera import Camera
from engine.data_sources.checkerboard import CheckerboardDataSource
from engine.data_sources.sources import SourceItem
from engine.display import DisplayRegistry, NullDisplay
from engine.effects.plugins import discover_plugins
from engine.effects import get_registry
from engine.effects.types import EffectConfig
from engine.frame import FrameTimer
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
from engine.pipeline.adapters import (
CameraClockStage,
CameraStage,
DataSourceStage,
DisplayStage,
EffectPluginStage,
SourceItemsToBufferStage,
)
from engine.pipeline.stages.framebuffer import FrameBufferStage
class GentleOscillator:
"""Produces smooth, gentle sinusoidal values."""
def __init__(
self, speed: float = 60.0, amplitude: float = 1.0, offset: float = 0.0
):
self.speed = speed # Period length in frames
self.amplitude = amplitude # Amplitude
self.offset = offset # Base offset
def value(self, frame: int) -> float:
"""Get oscillated value for given frame."""
return self.offset + self.amplitude * 0.5 * (1 + math.sin(frame / self.speed))
class PipelineDemoOrchestrator:
"""Orchestrates comprehensive pipeline demonstrations."""
def __init__(
self,
use_terminal: bool = True,
target_fps: float = 30.0,
effect_duration: float = 8.0,
mode_duration: float = 3.0,
enable_fps_switch: bool = False,
loop: bool = False,
verbose: bool = False,
):
self.use_terminal = use_terminal
self.target_fps = target_fps
self.effect_duration = effect_duration
self.mode_duration = mode_duration
self.enable_fps_switch = enable_fps_switch
self.loop = loop
self.verbose = verbose
self.frame_count = 0
self.pipeline = None
self.context = None
self.framebuffer = None
self.camera = None
self.timer = None
def log(self, message: str, verbose: bool = False):
"""Print with timestamp if verbose or always-important."""
if self.verbose or not verbose:
print(f"[{time.strftime('%H:%M:%S')}] {message}")
def build_base_pipeline(
self, camera_type: str = "scroll", camera_speed: float = 0.5
):
"""Build a base pipeline with all required components."""
self.log(f"Building base pipeline: camera={camera_type}, speed={camera_speed}")
# Camera
camera = Camera.scroll(speed=camera_speed)
camera.set_canvas_size(200, 200)
# Context
ctx = PipelineContext()
# Pipeline config
config = PipelineConfig(
source="empty",
display="terminal" if self.use_terminal else "null",
camera=camera_type,
effects=[],
enable_metrics=True,
)
pipeline = Pipeline(config=config, context=ctx)
# Use a large checkerboard pattern for visible motion effects
source = CheckerboardDataSource(width=200, height=200, square_size=10)
pipeline.add_stage("source", DataSourceStage(source, name="checkerboard"))
# Add camera clock (must run every frame)
pipeline.add_stage(
"camera_update", CameraClockStage(camera, name="camera-clock")
)
# Add render
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
# Add camera stage
pipeline.add_stage("camera", CameraStage(camera, name="camera"))
# Add framebuffer (optional for effects that use it)
self.framebuffer = FrameBufferStage(name="default", history_depth=5)
pipeline.add_stage("framebuffer", self.framebuffer)
# Add display
display_backend = "terminal" if self.use_terminal else "null"
display = DisplayRegistry.create(display_backend)
if display:
pipeline.add_stage("display", DisplayStage(display, name=display_backend))
# Build and initialize
pipeline.build(auto_inject=False)
pipeline.initialize()
self.pipeline = pipeline
self.context = ctx
self.camera = camera
self.log("Base pipeline built successfully")
return pipeline
def test_effects_oscillation(self):
"""Test each effect with gentle intensity oscillation."""
self.log("\n=== EFFECTS OSCILLATION TEST ===")
self.log(
f"Duration: {self.effect_duration}s per effect at {self.target_fps} FPS"
)
discover_plugins() # Ensure all plugins are registered
registry = get_registry()
all_effects = registry.list_all()
effect_names = [
name
for name in all_effects.keys()
if name not in ("motionblur", "afterimage")
]
# Calculate frames based on duration and FPS
frames_per_effect = int(self.effect_duration * self.target_fps)
oscillator = GentleOscillator(speed=90, amplitude=0.7, offset=0.3)
total_effects = len(effect_names) + 2 # +2 for motionblur and afterimage
estimated_total = total_effects * self.effect_duration
self.log(f"Testing {len(effect_names)} regular effects + 2 framebuffer effects")
self.log(f"Estimated time: {estimated_total:.0f}s")
for idx, effect_name in enumerate(sorted(effect_names), 1):
try:
self.log(f"[{idx}/{len(effect_names)}] Testing effect: {effect_name}")
effect = registry.get(effect_name)
if not effect:
self.log(f" Skipped: plugin not found")
continue
stage = EffectPluginStage(effect, name=effect_name)
self.pipeline.add_stage(f"effect_{effect_name}", stage)
self.pipeline.build(auto_inject=False)
self._run_frames(
frames_per_effect, oscillator=oscillator, effect=effect
)
self.pipeline.remove_stage(f"effect_{effect_name}")
self.pipeline.build(auto_inject=False)
self.log(f"{effect_name} completed successfully")
except Exception as e:
self.log(f"{effect_name} failed: {e}")
# Test motionblur and afterimage separately with framebuffer
for effect_name in ["motionblur", "afterimage"]:
try:
self.log(
f"[{len(effect_names) + 1}/{total_effects}] Testing effect: {effect_name} (with framebuffer)"
)
effect = registry.get(effect_name)
if not effect:
self.log(f" Skipped: plugin not found")
continue
stage = EffectPluginStage(
effect,
name=effect_name,
dependencies={"framebuffer.history.default"},
)
self.pipeline.add_stage(f"effect_{effect_name}", stage)
self.pipeline.build(auto_inject=False)
self._run_frames(
frames_per_effect, oscillator=oscillator, effect=effect
)
self.pipeline.remove_stage(f"effect_{effect_name}")
self.pipeline.build(auto_inject=False)
self.log(f"{effect_name} completed successfully")
except Exception as e:
self.log(f"{effect_name} failed: {e}")
def _run_frames(self, num_frames: int, oscillator=None, effect=None):
"""Run a specified number of frames with proper timing."""
for frame in range(num_frames):
self.frame_count += 1
self.context.set("frame_number", frame)
if oscillator and effect:
intensity = oscillator.value(frame)
effect.configure(EffectConfig(intensity=intensity))
dt = self.timer.sleep_until_next_frame()
self.camera.update(dt)
self.pipeline.execute([])
def test_framebuffer(self):
"""Test framebuffer functionality."""
self.log("\n=== FRAMEBUFFER TEST ===")
try:
# Run frames using FrameTimer for consistent pacing
self._run_frames(10)
# Check framebuffer history
history = self.context.get("framebuffer.default.history")
assert history is not None, "No framebuffer history found"
assert len(history) > 0, "Framebuffer history is empty"
self.log(f"History frames: {len(history)}")
self.log(f"Configured depth: {self.framebuffer.config.history_depth}")
# Check intensity computation
intensity = self.context.get("framebuffer.default.current_intensity")
assert intensity is not None, "No intensity map found"
self.log(f"Intensity map length: {len(intensity)}")
# Check that frames are being stored correctly
recent_frame = self.framebuffer.get_frame(0, self.context)
assert recent_frame is not None, "Cannot retrieve recent frame"
self.log(f"Recent frame rows: {len(recent_frame)}")
self.log("✓ Framebuffer test passed")
except Exception as e:
self.log(f"✗ Framebuffer test failed: {e}")
raise
def test_camera_modes(self):
"""Test each camera mode."""
self.log("\n=== CAMERA MODES TEST ===")
self.log(f"Duration: {self.mode_duration}s per mode at {self.target_fps} FPS")
camera_modes = [
("feed", 0.1),
("scroll", 0.5),
("horizontal", 0.3),
("omni", 0.3),
("floating", 0.5),
("bounce", 0.5),
("radial", 0.3),
]
frames_per_mode = int(self.mode_duration * self.target_fps)
self.log(f"Testing {len(camera_modes)} camera modes")
self.log(f"Estimated time: {len(camera_modes) * self.mode_duration:.0f}s")
for idx, (camera_type, speed) in enumerate(camera_modes, 1):
try:
self.log(f"[{idx}/{len(camera_modes)}] Testing camera: {camera_type}")
# Rebuild camera
self.camera.reset()
cam_class = getattr(Camera, camera_type, Camera.scroll)
new_camera = cam_class(speed=speed)
new_camera.set_canvas_size(200, 200)
# Update camera stages
clock_stage = CameraClockStage(new_camera, name="camera-clock")
self.pipeline.replace_stage("camera_update", clock_stage)
camera_stage = CameraStage(new_camera, name="camera")
self.pipeline.replace_stage("camera", camera_stage)
self.camera = new_camera
# Run frames with proper timing
self._run_frames(frames_per_mode)
# Verify camera moved (check final position)
x, y = self.camera.x, self.camera.y
self.log(f" Final position: ({x:.1f}, {y:.1f})")
if camera_type == "feed":
assert x == 0 and y == 0, "Feed camera should not move"
elif camera_type in ("scroll", "horizontal"):
assert abs(x) > 0 or abs(y) > 0, "Camera should have moved"
else:
self.log(f" Position check skipped (mode={camera_type})")
self.log(f"{camera_type} completed successfully")
except Exception as e:
self.log(f"{camera_type} failed: {e}")
def test_fps_switch_demo(self):
"""Demonstrate the effect of different frame rates on animation smoothness."""
if not self.enable_fps_switch:
return
self.log("\n=== FPS SWITCH DEMONSTRATION ===")
fps_sequence = [
(30.0, 5.0), # 30 FPS for 5 seconds
(60.0, 5.0), # 60 FPS for 5 seconds
(30.0, 5.0), # Back to 30 FPS for 5 seconds
(20.0, 3.0), # 20 FPS for 3 seconds
(60.0, 3.0), # 60 FPS for 3 seconds
]
original_fps = self.target_fps
for fps, duration in fps_sequence:
self.log(f"\n--- Switching to {fps} FPS for {duration}s ---")
self.target_fps = fps
self.timer.target_frame_dt = 1.0 / fps
# Update display FPS if supported
display = (
self.pipeline.get_stage("display").stage
if self.pipeline.get_stage("display")
else None
)
if display and hasattr(display, "target_fps"):
display.target_fps = fps
display._frame_period = 1.0 / fps if fps > 0 else 0
frames = int(duration * fps)
camera_type = "radial" # Use radial for smooth rotation that's visible at different FPS
speed = 0.3
# Rebuild camera if needed
self.camera.reset()
new_camera = Camera.radial(speed=speed)
new_camera.set_canvas_size(200, 200)
clock_stage = CameraClockStage(new_camera, name="camera-clock")
self.pipeline.replace_stage("camera_update", clock_stage)
camera_stage = CameraStage(new_camera, name="camera")
self.pipeline.replace_stage("camera", camera_stage)
self.camera = new_camera
for frame in range(frames):
self.context.set("frame_number", frame)
dt = self.timer.sleep_until_next_frame()
self.camera.update(dt)
result = self.pipeline.execute([])
self.log(f" Completed {frames} frames at {fps} FPS")
# Restore original FPS
self.target_fps = original_fps
self.timer.target_frame_dt = 1.0 / original_fps
self.log("✓ FPS switch demo completed")
def run(self):
"""Run the complete demo."""
start_time = time.time()
self.log("Starting Pipeline Demo Orchestrator")
self.log("=" * 50)
# Initialize frame timer
self.timer = FrameTimer(target_frame_dt=1.0 / self.target_fps)
# Build pipeline
self.build_base_pipeline()
try:
# Test framebuffer first (needed for motion blur effects)
self.test_framebuffer()
# Test effects
self.test_effects_oscillation()
# Test camera modes
self.test_camera_modes()
# Optional FPS switch demonstration
if self.enable_fps_switch:
self.test_fps_switch_demo()
else:
self.log("\n=== FPS SWITCH DEMO ===")
self.log("Skipped (enable with --switch-fps)")
elapsed = time.time() - start_time
self.log("\n" + "=" * 50)
self.log("Demo completed successfully!")
self.log(f"Total frames processed: {self.frame_count}")
self.log(f"Total elapsed time: {elapsed:.1f}s")
self.log(f"Average FPS: {self.frame_count / elapsed:.1f}")
finally:
# Always cleanup properly
self._cleanup()
def _cleanup(self):
"""Clean up pipeline resources."""
self.log("Cleaning up...", verbose=True)
if self.pipeline:
try:
self.pipeline.cleanup()
if self.verbose:
self.log("Pipeline cleaned up successfully", verbose=True)
except Exception as e:
self.log(f"Error during pipeline cleanup: {e}", verbose=True)
# If not looping, clear references
if not self.loop:
self.pipeline = None
self.context = None
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Pipeline Demo Orchestrator - comprehensive demo of Mainline pipeline"
)
parser.add_argument(
"--null",
action="store_true",
help="Use null display (no visual output)",
)
parser.add_argument(
"--fps",
type=float,
default=30.0,
help="Target frame rate (default: 30)",
)
parser.add_argument(
"--effect-duration",
type=float,
default=8.0,
help="Duration per effect in seconds (default: 8)",
)
parser.add_argument(
"--mode-duration",
type=float,
default=3.0,
help="Duration per camera mode in seconds (default: 3)",
)
parser.add_argument(
"--switch-fps",
action="store_true",
help="Include FPS switching demonstration",
)
parser.add_argument(
"--loop",
action="store_true",
help="Run demo in an infinite loop",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose output",
)
args = parser.parse_args()
orchestrator = PipelineDemoOrchestrator(
use_terminal=not args.null,
target_fps=args.fps,
effect_duration=args.effect_duration,
mode_duration=args.mode_duration,
enable_fps_switch=args.switch_fps,
loop=args.loop,
verbose=args.verbose,
)
try:
orchestrator.run()
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(0)
except Exception as e:
print(f"\nDemo failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""Render Mermaid diagrams in markdown files to ASCII art."""
import re
import subprocess
import sys
def extract_mermaid_blocks(content: str) -> list[str]:
"""Extract mermaid blocks from markdown."""
return re.findall(r"```mermaid\n(.*?)\n```", content, re.DOTALL)
def render_diagram(block: str) -> str:
"""Render a single mermaid block to ASCII."""
result = subprocess.run(
["mermaid-ascii", "-f", "-"],
input=block,
capture_output=True,
text=True,
)
if result.returncode != 0:
return f"ERROR: {result.stderr}"
return result.stdout
def main():
if len(sys.argv) < 2:
print("Usage: render-diagrams.py <markdown-file>")
sys.exit(1)
filename = sys.argv[1]
content = open(filename).read()
blocks = extract_mermaid_blocks(content)
print(f"Found {len(blocks)} mermaid diagram(s) in {filename}")
print()
for i, block in enumerate(blocks):
# Skip if empty
if not block.strip():
continue
print(f"=== Diagram {i + 1} ===")
print(render_diagram(block))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""Validate Mermaid diagrams in markdown files."""
import glob
import re
import sys
# Diagram types that are valid in Mermaid
VALID_TYPES = {
"flowchart",
"graph",
"classDiagram",
"sequenceDiagram",
"stateDiagram",
"stateDiagram-v2",
"erDiagram",
"gantt",
"pie",
"mindmap",
"journey",
"gitGraph",
"requirementDiagram",
}
def extract_mermaid_blocks(content: str) -> list[tuple[int, str]]:
"""Extract mermaid blocks with their positions."""
blocks = []
for match in re.finditer(r"```mermaid\n(.*?)\n```", content, re.DOTALL):
blocks.append((match.start(), match.group(1)))
return blocks
def validate_block(block: str) -> bool:
"""Check if a mermaid block has a valid diagram type."""
if not block.strip():
return True # Empty block is OK
first_line = block.strip().split("\n")[0]
return any(first_line.startswith(t) for t in VALID_TYPES)
def main():
md_files = glob.glob("docs/*.md")
errors = []
for filepath in md_files:
content = open(filepath).read()
blocks = extract_mermaid_blocks(content)
for i, (_, block) in enumerate(blocks):
if not validate_block(block):
errors.append(f"{filepath}: invalid diagram type in block {i + 1}")
if errors:
for e in errors:
print(f"ERROR: {e}")
sys.exit(1)
print(f"Validated {len(md_files)} markdown files - all OK")
if __name__ == "__main__":
main()