feat: Add oscilloscope with pipeline switching (text ↔ pygame)
- demo_oscilloscope_pipeline.py: Switches between text mode and Pygame+PIL mode - 15 FPS frame rate for smooth viewing - Mode switches every 15 seconds automatically - Pygame renderer with waveform visualization - PIL converts Pygame output to ANSI for terminal display - Uses fonts/Pixel_Sparta.otf for font rendering Usage: uv run python scripts/demo_oscilloscope_pipeline.py --lfo --modulate Pipeline: Text Mode (15s) → Pygame+PIL to ANSI (15s) → Repeat Related to #46
This commit is contained in:
411
scripts/demo_oscilloscope_pipeline.py
Normal file
411
scripts/demo_oscilloscope_pipeline.py
Normal file
@@ -0,0 +1,411 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Enhanced Oscilloscope with Pipeline Switching
|
||||
|
||||
This demo features:
|
||||
1. Text-based oscilloscope (first 15 seconds)
|
||||
2. Pygame renderer with PIL to ANSI conversion (next 15 seconds)
|
||||
3. Continuous looping between the two modes
|
||||
|
||||
Usage:
|
||||
uv run python scripts/demo_oscilloscope_pipeline.py --lfo --modulate
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# Add mainline to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
|
||||
|
||||
|
||||
class ModulatedOscillator:
|
||||
"""Oscillator with frequency modulation from another oscillator."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
waveform: str = "sine",
|
||||
base_frequency: float = 1.0,
|
||||
modulator: "OscillatorSensor | None" = None,
|
||||
modulation_depth: float = 0.5,
|
||||
):
|
||||
self.name = name
|
||||
self.waveform = waveform
|
||||
self.base_frequency = base_frequency
|
||||
self.modulator = modulator
|
||||
self.modulation_depth = modulation_depth
|
||||
|
||||
register_oscillator_sensor(
|
||||
name=name, waveform=waveform, frequency=base_frequency
|
||||
)
|
||||
self.osc = OscillatorSensor(
|
||||
name=name, waveform=waveform, frequency=base_frequency
|
||||
)
|
||||
self.osc.start()
|
||||
|
||||
def read(self):
|
||||
"""Read current value, applying modulation if present."""
|
||||
if self.modulator:
|
||||
mod_reading = self.modulator.read()
|
||||
if mod_reading:
|
||||
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
|
||||
effective_freq = self.base_frequency + mod_offset
|
||||
effective_freq = max(0.1, min(effective_freq, 20.0))
|
||||
self.osc._frequency = effective_freq
|
||||
return self.osc.read()
|
||||
|
||||
def get_phase(self):
|
||||
return self.osc._phase
|
||||
|
||||
def get_effective_frequency(self):
|
||||
if self.modulator:
|
||||
mod_reading = self.modulator.read()
|
||||
if mod_reading:
|
||||
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
|
||||
return max(0.1, min(self.base_frequency + mod_offset, 20.0))
|
||||
return self.base_frequency
|
||||
|
||||
def stop(self):
|
||||
self.osc.stop()
|
||||
|
||||
|
||||
def render_text_mode(
|
||||
width: int,
|
||||
height: int,
|
||||
modulator: OscillatorSensor,
|
||||
modulated: ModulatedOscillator,
|
||||
frame: int,
|
||||
) -> str:
|
||||
"""Render dual waveforms in text mode."""
|
||||
mod_reading = modulator.read()
|
||||
mod_val = mod_reading.value if mod_reading else 0.5
|
||||
modulated_reading = modulated.read()
|
||||
modulated_val = modulated_reading.value if modulated_reading else 0.5
|
||||
|
||||
lines = []
|
||||
header1 = (
|
||||
f"TEXT MODE | MODULATOR: {modulator.waveform} @ {modulator.frequency:.2f}Hz"
|
||||
)
|
||||
header2 = (
|
||||
f"MODULATED: {modulated.waveform} @ {modulated.get_effective_frequency():.2f}Hz"
|
||||
)
|
||||
lines.append(header1)
|
||||
lines.append(header2)
|
||||
lines.append("─" * width)
|
||||
|
||||
# Modulator waveform (top half)
|
||||
top_height = (height - 5) // 2
|
||||
waveform_fn = modulator.WAVEFORMS[modulator.waveform]
|
||||
mod_time_offset = modulator._phase * modulator.frequency * 0.3
|
||||
|
||||
for row in range(top_height):
|
||||
row_pos = 1.0 - (row / (top_height - 1))
|
||||
line_chars = []
|
||||
for col in range(width):
|
||||
col_fraction = col / width
|
||||
time_pos = mod_time_offset + col_fraction
|
||||
sample = waveform_fn(time_pos * modulator.frequency * 2)
|
||||
tolerance = 1.0 / (top_height - 1)
|
||||
if abs(sample - row_pos) < tolerance:
|
||||
line_chars.append("█")
|
||||
else:
|
||||
line_chars.append(" ")
|
||||
lines.append("".join(line_chars))
|
||||
|
||||
lines.append(
|
||||
f"─ MODULATION: depth={modulated.modulation_depth:.2f} | mod_value={mod_val:.2f} ─"
|
||||
)
|
||||
|
||||
# Modulated waveform (bottom half)
|
||||
bottom_height = height - top_height - 5
|
||||
waveform_fn = modulated.osc.WAVEFORMS[modulated.waveform]
|
||||
modulated_time_offset = (
|
||||
modulated.get_phase() * modulated.get_effective_frequency() * 0.3
|
||||
)
|
||||
|
||||
for row in range(bottom_height):
|
||||
row_pos = 1.0 - (row / (bottom_height - 1))
|
||||
line_chars = []
|
||||
for col in range(width):
|
||||
col_fraction = col / width
|
||||
time_pos = modulated_time_offset + col_fraction
|
||||
sample = waveform_fn(time_pos * modulated.get_effective_frequency() * 2)
|
||||
tolerance = 1.0 / (bottom_height - 1)
|
||||
if abs(sample - row_pos) < tolerance:
|
||||
line_chars.append("█")
|
||||
else:
|
||||
line_chars.append(" ")
|
||||
lines.append("".join(line_chars))
|
||||
|
||||
footer = (
|
||||
f"Mod Value: {mod_val:.3f} | Modulated: {modulated_val:.3f} | Frame: {frame}"
|
||||
)
|
||||
lines.append(footer)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def render_pygame_to_ansi(
|
||||
width: int,
|
||||
height: int,
|
||||
modulator: OscillatorSensor,
|
||||
modulated: ModulatedOscillator,
|
||||
frame: int,
|
||||
font_path: str | None,
|
||||
) -> str:
|
||||
"""Render waveforms using Pygame, convert to ANSI with PIL."""
|
||||
try:
|
||||
import pygame
|
||||
from PIL import Image
|
||||
except ImportError:
|
||||
return "Pygame or PIL not available\n\n" + render_text_mode(
|
||||
width, height, modulator, modulated, frame
|
||||
)
|
||||
|
||||
# Initialize Pygame surface (smaller for ANSI conversion)
|
||||
pygame_width = width * 2 # Double for better quality
|
||||
pygame_height = height * 4
|
||||
surface = pygame.Surface((pygame_width, pygame_height))
|
||||
surface.fill((10, 10, 20)) # Dark background
|
||||
|
||||
# Get readings
|
||||
mod_reading = modulator.read()
|
||||
mod_val = mod_reading.value if mod_reading else 0.5
|
||||
modulated_reading = modulated.read()
|
||||
modulated_val = modulated_reading.value if modulated_reading else 0.5
|
||||
|
||||
# Draw modulator waveform (top half)
|
||||
top_height = pygame_height // 2
|
||||
waveform_fn = modulator.WAVEFORMS[modulator.waveform]
|
||||
mod_time_offset = modulator._phase * modulator.frequency * 0.3
|
||||
|
||||
prev_x, prev_y = 0, 0
|
||||
for x in range(pygame_width):
|
||||
col_fraction = x / pygame_width
|
||||
time_pos = mod_time_offset + col_fraction
|
||||
sample = waveform_fn(time_pos * modulator.frequency * 2)
|
||||
y = int(top_height - (sample * (top_height - 20)) - 10)
|
||||
if x > 0:
|
||||
pygame.draw.line(surface, (100, 200, 255), (prev_x, prev_y), (x, y), 2)
|
||||
prev_x, prev_y = x, y
|
||||
|
||||
# Draw separator
|
||||
pygame.draw.line(
|
||||
surface, (80, 80, 100), (0, top_height), (pygame_width, top_height), 1
|
||||
)
|
||||
|
||||
# Draw modulated waveform (bottom half)
|
||||
bottom_start = top_height + 10
|
||||
bottom_height = pygame_height - bottom_start - 20
|
||||
waveform_fn = modulated.osc.WAVEFORMS[modulated.waveform]
|
||||
modulated_time_offset = (
|
||||
modulated.get_phase() * modulated.get_effective_frequency() * 0.3
|
||||
)
|
||||
|
||||
prev_x, prev_y = 0, 0
|
||||
for x in range(pygame_width):
|
||||
col_fraction = x / pygame_width
|
||||
time_pos = modulated_time_offset + col_fraction
|
||||
sample = waveform_fn(time_pos * modulated.get_effective_frequency() * 2)
|
||||
y = int(bottom_start + (bottom_height - (sample * (bottom_height - 20))) - 10)
|
||||
if x > 0:
|
||||
pygame.draw.line(surface, (255, 150, 100), (prev_x, prev_y), (x, y), 2)
|
||||
prev_x, prev_y = x, y
|
||||
|
||||
# Draw info text on pygame surface
|
||||
try:
|
||||
if font_path:
|
||||
font = pygame.font.Font(font_path, 16)
|
||||
info_text = f"PYGAME MODE | Mod: {mod_val:.2f} | Out: {modulated_val:.2f} | Frame: {frame}"
|
||||
text_surface = font.render(info_text, True, (200, 200, 200))
|
||||
surface.blit(text_surface, (10, 10))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Convert Pygame surface to PIL Image
|
||||
img_str = pygame.image.tostring(surface, "RGB")
|
||||
pil_image = Image.frombytes("RGB", (pygame_width, pygame_height), img_str)
|
||||
|
||||
# Convert to ANSI
|
||||
return pil_to_ansi(pil_image)
|
||||
|
||||
|
||||
def pil_to_ansi(image) -> str:
|
||||
"""Convert PIL image to ANSI escape codes."""
|
||||
# Resize for terminal display
|
||||
terminal_width = 80
|
||||
terminal_height = 30
|
||||
image = image.resize((terminal_width * 2, terminal_height * 2))
|
||||
|
||||
# Convert to grayscale
|
||||
image = image.convert("L")
|
||||
|
||||
# ANSI character ramp (dark to light)
|
||||
chars = " .:-=+*#%@"
|
||||
|
||||
lines = []
|
||||
for y in range(0, image.height, 2): # Sample every 2nd row for aspect ratio
|
||||
line = ""
|
||||
for x in range(0, image.width, 2):
|
||||
pixel = image.getpixel((x, y))
|
||||
char_index = int((pixel / 255) * (len(chars) - 1))
|
||||
line += chars[char_index]
|
||||
lines.append(line)
|
||||
|
||||
# Add header info
|
||||
header = "PYGAME → ANSI RENDER MODE"
|
||||
header_line = "─" * terminal_width
|
||||
return f"{header}\n{header_line}\n" + "\n".join(lines)
|
||||
|
||||
|
||||
def demo_with_pipeline_switching(
|
||||
waveform: str = "sine",
|
||||
base_freq: float = 0.5,
|
||||
modulate: bool = False,
|
||||
mod_waveform: str = "sine",
|
||||
mod_freq: float = 0.5,
|
||||
mod_depth: float = 0.5,
|
||||
frames: int = 0,
|
||||
):
|
||||
"""Run demo with pipeline switching every 15 seconds."""
|
||||
frame_interval = 1.0 / 15.0 # 15 FPS
|
||||
mode_duration = 15.0 # 15 seconds per mode
|
||||
|
||||
print("Enhanced Oscilloscope with Pipeline Switching")
|
||||
print(f"Mode duration: {mode_duration} seconds")
|
||||
print("Frame rate: 15 FPS")
|
||||
print()
|
||||
|
||||
# Create oscillators
|
||||
modulator = OscillatorSensor(
|
||||
name="modulator", waveform=mod_waveform, frequency=mod_freq
|
||||
)
|
||||
modulator.start()
|
||||
|
||||
modulated = ModulatedOscillator(
|
||||
name="modulated",
|
||||
waveform=waveform,
|
||||
base_frequency=base_freq,
|
||||
modulator=modulator if modulate else None,
|
||||
modulation_depth=mod_depth,
|
||||
)
|
||||
|
||||
# Find font path
|
||||
font_path = Path("fonts/Pixel_Sparta.otf")
|
||||
if not font_path.exists():
|
||||
font_path = Path("fonts/Pixel Sparta.otf")
|
||||
font_path = str(font_path) if font_path.exists() else None
|
||||
|
||||
# Run demo loop
|
||||
try:
|
||||
frame = 0
|
||||
mode_start_time = time.time()
|
||||
mode_index = 0 # 0 = text, 1 = pygame
|
||||
|
||||
while frames == 0 or frame < frames:
|
||||
elapsed = time.time() - mode_start_time
|
||||
|
||||
# Switch mode every 15 seconds
|
||||
if elapsed >= mode_duration:
|
||||
mode_index = (mode_index + 1) % 2
|
||||
mode_start_time = time.time()
|
||||
print(f"\n{'=' * 60}")
|
||||
print(
|
||||
f"SWITCHING TO {'PYGAME+ANSI' if mode_index == 1 else 'TEXT'} MODE"
|
||||
)
|
||||
print(f"{'=' * 60}\n")
|
||||
time.sleep(1.0) # Brief pause to show mode switch
|
||||
|
||||
# Render based on mode
|
||||
if mode_index == 0:
|
||||
# Text mode
|
||||
visualization = render_text_mode(80, 30, modulator, modulated, frame)
|
||||
else:
|
||||
# Pygame + PIL to ANSI mode
|
||||
visualization = render_pygame_to_ansi(
|
||||
80, 30, modulator, modulated, frame, font_path
|
||||
)
|
||||
|
||||
# Display with cursor positioning
|
||||
print("\033[H" + visualization)
|
||||
|
||||
# Frame timing
|
||||
time.sleep(frame_interval)
|
||||
frame += 1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nDemo stopped by user")
|
||||
|
||||
finally:
|
||||
modulator.stop()
|
||||
modulated.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Enhanced oscilloscope with pipeline switching"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--waveform",
|
||||
choices=["sine", "square", "sawtooth", "triangle", "noise"],
|
||||
default="sine",
|
||||
help="Main waveform type",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--frequency",
|
||||
type=float,
|
||||
default=0.5,
|
||||
help="Main oscillator frequency (LFO range)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lfo",
|
||||
action="store_true",
|
||||
help="Use slow LFO frequency (0.5Hz)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--modulate",
|
||||
action="store_true",
|
||||
help="Enable LFO modulation chain",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mod-waveform",
|
||||
choices=["sine", "square", "sawtooth", "triangle", "noise"],
|
||||
default="sine",
|
||||
help="Modulator waveform type",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mod-freq",
|
||||
type=float,
|
||||
default=0.5,
|
||||
help="Modulator frequency in Hz",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mod-depth",
|
||||
type=float,
|
||||
default=0.5,
|
||||
help="Modulation depth",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--frames",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Number of frames to render (0 = infinite)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
base_freq = args.frequency
|
||||
if args.lfo:
|
||||
base_freq = 0.5
|
||||
|
||||
demo_with_pipeline_switching(
|
||||
waveform=args.waveform,
|
||||
base_freq=base_freq,
|
||||
modulate=args.modulate,
|
||||
mod_waveform=args.mod_waveform,
|
||||
mod_freq=args.mod_freq,
|
||||
mod_depth=args.mod_depth,
|
||||
)
|
||||
Reference in New Issue
Block a user