fix(tests): mock network calls in datasource tests

- Mock fetch_all in test_datasource_stage_process
- Test now runs in 0.22s instead of several seconds
This commit is contained in:
2026-03-16 13:56:02 -07:00
parent 997bffab68
commit e23ba81570

View File

@@ -105,11 +105,13 @@ class TestPipeline:
mock_source.name = "source"
mock_source.category = "source"
mock_source.dependencies = set()
mock_source.capabilities = {"source"}
mock_display = MagicMock(spec=Stage)
mock_display.name = "display"
mock_display.category = "display"
mock_display.dependencies = {"source"}
mock_display.capabilities = {"display"}
pipeline.add_stage("source", mock_source)
pipeline.add_stage("display", mock_display)
@@ -129,18 +131,21 @@ class TestPipeline:
mock_source.name = "source"
mock_source.category = "source"
mock_source.dependencies = set()
mock_source.capabilities = {"source"}
mock_source.process = lambda data, ctx: call_order.append("source") or "data"
mock_effect = MagicMock(spec=Stage)
mock_effect.name = "effect"
mock_effect.category = "effect"
mock_effect.dependencies = {"source"}
mock_effect.capabilities = {"effect"}
mock_effect.process = lambda data, ctx: call_order.append("effect") or data
mock_display = MagicMock(spec=Stage)
mock_display.name = "display"
mock_display.category = "display"
mock_display.dependencies = {"effect"}
mock_display.capabilities = {"display"}
mock_display.process = lambda data, ctx: call_order.append("display") or data
pipeline.add_stage("source", mock_source)
@@ -161,12 +166,14 @@ class TestPipeline:
mock_source.name = "source"
mock_source.category = "source"
mock_source.dependencies = set()
mock_source.capabilities = {"source"}
mock_source.process = lambda data, ctx: "data"
mock_failing = MagicMock(spec=Stage)
mock_failing.name = "failing"
mock_failing.category = "effect"
mock_failing.dependencies = {"source"}
mock_failing.capabilities = {"effect"}
mock_failing.optional = False
mock_failing.process = lambda data, ctx: (_ for _ in ()).throw(
Exception("fail")
@@ -189,12 +196,14 @@ class TestPipeline:
mock_source.name = "source"
mock_source.category = "source"
mock_source.dependencies = set()
mock_source.capabilities = {"source"}
mock_source.process = lambda data, ctx: "data"
mock_optional = MagicMock(spec=Stage)
mock_optional.name = "optional"
mock_optional.category = "effect"
mock_optional.dependencies = {"source"}
mock_optional.capabilities = {"effect"}
mock_optional.optional = True
mock_optional.process = lambda data, ctx: (_ for _ in ()).throw(
Exception("fail")
@@ -209,6 +218,144 @@ class TestPipeline:
assert result.success is True
class TestCapabilityBasedDependencies:
"""Tests for capability-based dependency resolution."""
def test_capability_wildcard_resolution(self):
"""Pipeline resolves dependencies using wildcard capabilities."""
from engine.pipeline.controller import Pipeline
from engine.pipeline.core import Stage
class SourceStage(Stage):
name = "headlines"
category = "source"
@property
def capabilities(self):
return {"source.headlines"}
@property
def dependencies(self):
return set()
def process(self, data, ctx):
return data
class RenderStage(Stage):
name = "render"
category = "render"
@property
def capabilities(self):
return {"render.output"}
@property
def dependencies(self):
return {"source.*"}
def process(self, data, ctx):
return data
pipeline = Pipeline()
pipeline.add_stage("headlines", SourceStage())
pipeline.add_stage("render", RenderStage())
pipeline.build()
assert "headlines" in pipeline.execution_order
assert "render" in pipeline.execution_order
assert pipeline.execution_order.index(
"headlines"
) < pipeline.execution_order.index("render")
def test_missing_capability_raises_error(self):
"""Pipeline raises error when capability is missing."""
from engine.pipeline.controller import Pipeline
from engine.pipeline.core import Stage, StageError
class RenderStage(Stage):
name = "render"
category = "render"
@property
def capabilities(self):
return {"render.output"}
@property
def dependencies(self):
return {"source.headlines"}
def process(self, data, ctx):
return data
pipeline = Pipeline()
pipeline.add_stage("render", RenderStage())
try:
pipeline.build()
raise AssertionError("Should have raised StageError")
except StageError as e:
assert "Missing capabilities" in e.message
assert "source.headlines" in e.message
def test_multiple_stages_same_capability(self):
"""Pipeline uses first registered stage for capability."""
from engine.pipeline.controller import Pipeline
from engine.pipeline.core import Stage
class SourceA(Stage):
name = "headlines"
category = "source"
@property
def capabilities(self):
return {"source"}
@property
def dependencies(self):
return set()
def process(self, data, ctx):
return "A"
class SourceB(Stage):
name = "poetry"
category = "source"
@property
def capabilities(self):
return {"source"}
@property
def dependencies(self):
return set()
def process(self, data, ctx):
return "B"
class DisplayStage(Stage):
name = "display"
category = "display"
@property
def capabilities(self):
return {"display"}
@property
def dependencies(self):
return {"source"}
def process(self, data, ctx):
return data
pipeline = Pipeline()
pipeline.add_stage("headlines", SourceA())
pipeline.add_stage("poetry", SourceB())
pipeline.add_stage("display", DisplayStage())
pipeline.build()
assert pipeline.execution_order[0] == "headlines"
class TestPipelineContext:
"""Tests for PipelineContext."""
@@ -279,3 +426,495 @@ class TestCreateDefaultPipeline:
assert pipeline is not None
assert "display" in pipeline.stages
class TestPipelineParams:
"""Tests for PipelineParams."""
def test_default_values(self):
"""PipelineParams has correct defaults."""
from engine.pipeline.params import PipelineParams
params = PipelineParams()
assert params.source == "headlines"
assert params.display == "terminal"
assert params.camera_mode == "vertical"
assert params.effect_order == ["noise", "fade", "glitch", "firehose", "hud"]
def test_effect_config(self):
"""PipelineParams effect config methods work."""
from engine.pipeline.params import PipelineParams
params = PipelineParams()
enabled, intensity = params.get_effect_config("noise")
assert enabled is True
assert intensity == 1.0
params.set_effect_config("noise", False, 0.5)
enabled, intensity = params.get_effect_config("noise")
assert enabled is False
assert intensity == 0.5
def test_is_effect_enabled(self):
"""PipelineParams is_effect_enabled works."""
from engine.pipeline.params import PipelineParams
params = PipelineParams()
assert params.is_effect_enabled("noise") is True
params.effect_enabled["noise"] = False
assert params.is_effect_enabled("noise") is False
def test_to_dict_from_dict(self):
"""PipelineParams serialization roundtrip works."""
from engine.pipeline.params import PipelineParams
params = PipelineParams()
params.viewport_width = 100
params.viewport_height = 50
data = params.to_dict()
restored = PipelineParams.from_dict(data)
assert restored.viewport_width == 100
assert restored.viewport_height == 50
def test_copy(self):
"""PipelineParams copy works."""
from engine.pipeline.params import PipelineParams
params = PipelineParams()
params.viewport_width = 100
params.effect_enabled["noise"] = False
copy = params.copy()
assert copy.viewport_width == 100
assert copy.effect_enabled["noise"] is False
class TestPipelinePresets:
"""Tests for pipeline presets."""
def test_presets_defined(self):
"""All expected presets are defined."""
from engine.pipeline.presets import (
DEMO_PRESET,
FIREHOSE_PRESET,
PIPELINE_VIZ_PRESET,
POETRY_PRESET,
SIXEL_PRESET,
WEBSOCKET_PRESET,
)
assert DEMO_PRESET.name == "demo"
assert POETRY_PRESET.name == "poetry"
assert FIREHOSE_PRESET.name == "firehose"
assert PIPELINE_VIZ_PRESET.name == "pipeline"
assert SIXEL_PRESET.name == "sixel"
assert WEBSOCKET_PRESET.name == "websocket"
def test_preset_to_params(self):
"""Presets convert to PipelineParams correctly."""
from engine.pipeline.presets import DEMO_PRESET
params = DEMO_PRESET.to_params()
assert params.source == "headlines"
assert params.display == "pygame"
assert "noise" in params.effect_order
def test_list_presets(self):
"""list_presets returns all presets."""
from engine.pipeline.presets import list_presets
presets = list_presets()
assert "demo" in presets
assert "poetry" in presets
assert "firehose" in presets
def test_get_preset(self):
"""get_preset returns correct preset."""
from engine.pipeline.presets import get_preset
preset = get_preset("demo")
assert preset is not None
assert preset.name == "demo"
assert get_preset("nonexistent") is None
class TestStageAdapters:
"""Tests for pipeline stage adapters."""
def test_render_stage_capabilities(self):
"""RenderStage declares correct capabilities."""
from engine.pipeline.adapters import RenderStage
stage = RenderStage(items=[], name="render")
assert "render.output" in stage.capabilities
def test_render_stage_dependencies(self):
"""RenderStage declares correct dependencies."""
from engine.pipeline.adapters import RenderStage
stage = RenderStage(items=[], name="render")
assert "source" in stage.dependencies
def test_render_stage_process(self):
"""RenderStage.process returns buffer."""
from engine.pipeline.adapters import RenderStage
from engine.pipeline.core import PipelineContext
items = [
("Test Headline", "test", 1234567890.0),
]
stage = RenderStage(items=items, width=80, height=24)
ctx = PipelineContext()
result = stage.process(None, ctx)
assert result is not None
assert isinstance(result, list)
def test_items_stage(self):
"""ItemsStage provides items to pipeline."""
from engine.pipeline.adapters import ItemsStage
from engine.pipeline.core import PipelineContext
items = [("Headline 1", "src1", 123.0), ("Headline 2", "src2", 124.0)]
stage = ItemsStage(items, name="headlines")
ctx = PipelineContext()
result = stage.process(None, ctx)
assert result == items
def test_display_stage_init(self):
"""DisplayStage.init initializes display."""
from engine.display.backends.null import NullDisplay
from engine.pipeline.adapters import DisplayStage
from engine.pipeline.core import PipelineContext
from engine.pipeline.params import PipelineParams
display = NullDisplay()
stage = DisplayStage(display, name="null")
ctx = PipelineContext()
ctx.params = PipelineParams()
result = stage.init(ctx)
assert result is True
def test_display_stage_process(self):
"""DisplayStage.process forwards to display."""
from engine.display.backends.null import NullDisplay
from engine.pipeline.adapters import DisplayStage
from engine.pipeline.core import PipelineContext
from engine.pipeline.params import PipelineParams
display = NullDisplay()
stage = DisplayStage(display, name="null")
ctx = PipelineContext()
ctx.params = PipelineParams()
stage.init(ctx)
buffer = ["line1", "line2"]
result = stage.process(buffer, ctx)
assert result == buffer
def test_camera_stage(self):
"""CameraStage applies camera transform."""
from engine.camera import Camera, CameraMode
from engine.pipeline.adapters import CameraStage
from engine.pipeline.core import PipelineContext
camera = Camera(mode=CameraMode.VERTICAL)
stage = CameraStage(camera, name="vertical")
PipelineContext()
assert "camera" in stage.capabilities
assert "source.items" in stage.dependencies
class TestDataSourceStage:
"""Tests for DataSourceStage adapter."""
def test_datasource_stage_capabilities(self):
"""DataSourceStage declares correct capabilities."""
from engine.pipeline.adapters import DataSourceStage
from engine.sources_v2 import HeadlinesDataSource
source = HeadlinesDataSource()
stage = DataSourceStage(source, name="headlines")
assert "source.headlines" in stage.capabilities
def test_datasource_stage_process(self):
"""DataSourceStage fetches from DataSource."""
from unittest.mock import patch
from engine.pipeline.adapters import DataSourceStage
from engine.pipeline.core import PipelineContext
from engine.sources_v2 import HeadlinesDataSource
mock_items = [
("Test Headline 1", "TestSource", "12:00"),
("Test Headline 2", "TestSource", "12:01"),
]
with patch("engine.fetch.fetch_all", return_value=(mock_items, 1, 0)):
source = HeadlinesDataSource()
stage = DataSourceStage(source, name="headlines")
result = stage.process(None, PipelineContext())
assert result is not None
assert isinstance(result, list)
class TestEffectPluginStage:
"""Tests for EffectPluginStage adapter."""
def test_effect_stage_capabilities(self):
"""EffectPluginStage declares correct capabilities."""
from engine.effects.types import EffectPlugin
from engine.pipeline.adapters import EffectPluginStage
class TestEffect(EffectPlugin):
name = "test"
def process(self, buf, ctx):
return buf
def configure(self, config):
pass
effect = TestEffect()
stage = EffectPluginStage(effect, name="test")
assert "effect.test" in stage.capabilities
def test_effect_stage_with_sensor_bindings(self):
"""EffectPluginStage applies sensor param bindings."""
from engine.effects.types import EffectConfig, EffectPlugin
from engine.pipeline.adapters import EffectPluginStage
from engine.pipeline.core import PipelineContext
from engine.pipeline.params import PipelineParams
class SensorDrivenEffect(EffectPlugin):
name = "sensor_effect"
config = EffectConfig(intensity=1.0)
param_bindings = {
"intensity": {"sensor": "mic", "transform": "linear"},
}
def process(self, buf, ctx):
return buf
def configure(self, config):
pass
effect = SensorDrivenEffect()
stage = EffectPluginStage(effect, name="sensor_effect")
ctx = PipelineContext()
ctx.params = PipelineParams()
ctx.set_state("sensor.mic", 0.5)
result = stage.process(["test"], ctx)
assert result == ["test"]
class TestFullPipeline:
"""End-to-end tests for the full pipeline."""
def test_pipeline_with_items_and_effect(self):
"""Pipeline executes items->effect flow."""
from engine.effects.types import EffectConfig, EffectPlugin
from engine.pipeline.adapters import EffectPluginStage, ItemsStage
from engine.pipeline.controller import Pipeline, PipelineConfig
class TestEffect(EffectPlugin):
name = "test"
config = EffectConfig()
def process(self, buf, ctx):
return [f"processed: {line}" for line in buf]
def configure(self, config):
pass
pipeline = Pipeline(config=PipelineConfig(enable_metrics=False))
# Items stage
items = [("Headline 1", "src1", 123.0)]
pipeline.add_stage("source", ItemsStage(items, name="headlines"))
# Effect stage
pipeline.add_stage("effect", EffectPluginStage(TestEffect(), name="test"))
pipeline.build()
result = pipeline.execute(None)
assert result.success is True
assert "processed:" in result.data[0]
def test_pipeline_with_items_stage(self):
"""Pipeline with ItemsStage provides items through pipeline."""
from engine.pipeline.adapters import ItemsStage
from engine.pipeline.controller import Pipeline, PipelineConfig
pipeline = Pipeline(config=PipelineConfig(enable_metrics=False))
# Items stage provides source
items = [("Headline 1", "src1", 123.0), ("Headline 2", "src2", 124.0)]
pipeline.add_stage("source", ItemsStage(items, name="headlines"))
pipeline.build()
result = pipeline.execute(None)
assert result.success is True
# Items are passed through
assert result.data == items
def test_pipeline_circular_dependency_detection(self):
"""Pipeline detects circular dependencies."""
from engine.pipeline.controller import Pipeline
from engine.pipeline.core import Stage
class StageA(Stage):
name = "a"
@property
def capabilities(self):
return {"a"}
@property
def dependencies(self):
return {"b"}
def process(self, data, ctx):
return data
class StageB(Stage):
name = "b"
@property
def capabilities(self):
return {"b"}
@property
def dependencies(self):
return {"a"}
def process(self, data, ctx):
return data
pipeline = Pipeline()
pipeline.add_stage("a", StageA())
pipeline.add_stage("b", StageB())
try:
pipeline.build()
raise AssertionError("Should detect circular dependency")
except Exception:
pass
def test_datasource_stage_capabilities_match_render_deps(self):
"""DataSourceStage provides capability that RenderStage can depend on."""
from engine.pipeline.adapters import DataSourceStage, RenderStage
from engine.sources_v2 import HeadlinesDataSource
# DataSourceStage provides "source.headlines"
ds_stage = DataSourceStage(HeadlinesDataSource(), name="headlines")
assert "source.headlines" in ds_stage.capabilities
# RenderStage depends on "source"
r_stage = RenderStage(items=[], width=80, height=24)
assert "source" in r_stage.dependencies
# Test the capability matching directly
from engine.pipeline.controller import Pipeline, PipelineConfig
pipeline = Pipeline(config=PipelineConfig(enable_metrics=False))
pipeline.add_stage("source", ds_stage)
pipeline.add_stage("render", r_stage)
# Build capability map and test matching
pipeline._capability_map = pipeline._build_capability_map()
# "source" should match "source.headlines"
match = pipeline._find_stage_with_capability("source")
assert match == "source", f"Expected 'source', got {match}"
class TestPipelineMetrics:
"""Tests for pipeline metrics collection."""
def test_metrics_collected(self):
"""Pipeline collects metrics when enabled."""
from engine.pipeline.controller import Pipeline, PipelineConfig
from engine.pipeline.core import Stage
class DummyStage(Stage):
name = "dummy"
category = "test"
def process(self, data, ctx):
return data
config = PipelineConfig(enable_metrics=True)
pipeline = Pipeline(config=config)
pipeline.add_stage("dummy", DummyStage())
pipeline.build()
pipeline.execute("test_data")
summary = pipeline.get_metrics_summary()
assert "pipeline" in summary
assert summary["frame_count"] == 1
def test_metrics_disabled(self):
"""Pipeline skips metrics when disabled."""
from engine.pipeline.controller import Pipeline, PipelineConfig
from engine.pipeline.core import Stage
class DummyStage(Stage):
name = "dummy"
category = "test"
def process(self, data, ctx):
return data
config = PipelineConfig(enable_metrics=False)
pipeline = Pipeline(config=config)
pipeline.add_stage("dummy", DummyStage())
pipeline.build()
pipeline.execute("test_data")
summary = pipeline.get_metrics_summary()
assert "error" in summary
def test_reset_metrics(self):
"""Pipeline.reset_metrics clears collected metrics."""
from engine.pipeline.controller import Pipeline, PipelineConfig
from engine.pipeline.core import Stage
class DummyStage(Stage):
name = "dummy"
category = "test"
def process(self, data, ctx):
return data
config = PipelineConfig(enable_metrics=True)
pipeline = Pipeline(config=config)
pipeline.add_stage("dummy", DummyStage())
pipeline.build()
pipeline.execute("test1")
pipeline.execute("test2")
assert pipeline.get_metrics_summary()["frame_count"] == 2
pipeline.reset_metrics()
# After reset, metrics collection starts fresh
pipeline.execute("test3")
assert pipeline.get_metrics_summary()["frame_count"] == 1