From ce4dce6a4f3792fee36ff5f531e597aa5294a4bb Mon Sep 17 00:00:00 2001 From: Trey Date: Wed, 24 Jun 2026 10:10:40 -0700 Subject: [PATCH 1/2] Remove the session-factory aggregation mirror MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The core is the single source of capability aggregation on the Serve path (AdvertiseFromCore), so the session factory's parallel aggregation is dead code. Part of the #5621 Stage 2 dead-factory-mirror removal: - session/factory.go: remove the `aggregator` field, `WithAggregator`, `buildRoutingTableWithAggregator`, and the `if f.aggregator != nil` branch — makeBaseSession always builds the routing table from raw backend capabilities (no overrides/conflict- resolution/filter; the core owns that). makeBaseSession's now-always-nil error return is dropped (+ its two callers). - aggregator: remove `Aggregator.ProcessPreQueriedCapabilities` (interface + impl + regenerated mock) — its only caller was buildRoutingTableWithAggregator. - cli/serve.go: inline the trivial createSessionFactory to NewSessionFactory and drop the WithAggregator(agg) wiring (agg still feeds the core via Config.Aggregator). Part of #5621. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../aggregator/advertised_backendid_test.go | 97 ------ pkg/vmcp/aggregator/aggregator.go | 19 -- pkg/vmcp/aggregator/default_aggregator.go | 63 ---- .../aggregator/default_aggregator_test.go | 285 ------------------ pkg/vmcp/aggregator/mocks/mock_interfaces.go | 17 -- pkg/vmcp/cli/serve.go | 18 +- pkg/vmcp/cli/serve_test.go | 30 -- pkg/vmcp/session/factory.go | 99 +----- 8 files changed, 12 insertions(+), 616 deletions(-) diff --git a/pkg/vmcp/aggregator/advertised_backendid_test.go b/pkg/vmcp/aggregator/advertised_backendid_test.go index 6931e55332..3e0678ea6b 100644 --- a/pkg/vmcp/aggregator/advertised_backendid_test.go +++ b/pkg/vmcp/aggregator/advertised_backendid_test.go @@ -311,100 +311,3 @@ func TestDefaultAggregator_AdvertisingFilterPreservesBackendID(t *testing.T) { }) } } - -func TestDefaultAggregator_ProcessPreQueriedCapabilitiesCarryBackendID(t *testing.T) { - t.Parallel() - - newTarget := func(id string) *vmcp.BackendTarget { - return &vmcp.BackendTarget{ - WorkloadID: id, - WorkloadName: id + "-name", - BaseURL: "http://" + id + ":8080", - TransportType: "streamable-http", - HealthStatus: vmcp.BackendHealthy, - } - } - - manualConfigs := []*config.WorkloadToolConfig{ - {Workload: "backend1", Overrides: map[string]*config.ToolOverride{"fetch": {Name: "custom_fetch_b1"}}}, - {Workload: "backend2", Overrides: map[string]*config.ToolOverride{"fetch": {Name: "custom_fetch_b2"}}}, - } - - tests := []struct { - name string - strategy vmcp.ConflictResolutionStrategy - priorityOrder []string - wlConfigs []*config.WorkloadToolConfig - aggCfg *config.AggregationConfig - }{ - { - name: "prefix strategy", - strategy: vmcp.ConflictStrategyPrefix, - }, - { - name: "priority strategy", - strategy: vmcp.ConflictStrategyPriority, - priorityOrder: []string{"backend1", "backend2"}, - }, - { - name: "manual strategy", - strategy: vmcp.ConflictStrategyManual, - wlConfigs: manualConfigs, - aggCfg: &config.AggregationConfig{Tools: manualConfigs}, - }, - { - name: "prefix strategy with global ExcludeAllTools", - strategy: vmcp.ConflictStrategyPrefix, - aggCfg: &config.AggregationConfig{ExcludeAllTools: true}, - }, - { - name: "prefix strategy with per-workload Filter", - strategy: vmcp.ConflictStrategyPrefix, - aggCfg: &config.AggregationConfig{ - Tools: []*config.WorkloadToolConfig{{Workload: "backend1", Filter: []string{"tool_a"}}}, - }, - }, - } - - for _, tt := range tests { - tt := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - - toolsByBackend := map[string][]vmcp.Tool{ - "backend1": {newTestTool("fetch", "backend1"), newTestTool("tool_a", "backend1")}, - "backend2": {newTestTool("fetch", "backend2"), newTestTool("tool_b", "backend2")}, - } - targets := map[string]*vmcp.BackendTarget{ - "backend1": newTarget("backend1"), - "backend2": newTarget("backend2"), - } - - resolver := resolverForStrategy(t, tt.strategy, tt.priorityOrder, tt.wlConfigs) - agg := NewDefaultAggregator(nil, resolver, tt.aggCfg, nil) - - advertised, allResolved, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), toolsByBackend, targets, - ) - require.NoError(t, err) - - // allResolvedTools is the full set regardless of advertising filter; - // every entry must carry a BackendID for composite-tool schema lookup. - require.NotEmpty(t, allResolved) - for _, tool := range allResolved { - assert.NotEmptyf(t, tool.BackendID, - "resolved tool %q must carry a non-empty BackendID", tool.Name) - } - // Every advertised (post-filter) tool must carry a BackendID, and its - // routing target must agree on the backend identity. - for _, tool := range advertised { - assert.NotEmptyf(t, tool.BackendID, - "advertised tool %q must carry a non-empty BackendID", tool.Name) - target := routingTable[tool.Name] - require.NotNilf(t, target, "routing target for %q", tool.Name) - assert.Equal(t, tool.BackendID, target.WorkloadID, - "advertised tool %q BackendID must match its routing target", tool.Name) - } - }) - } -} diff --git a/pkg/vmcp/aggregator/aggregator.go b/pkg/vmcp/aggregator/aggregator.go index 971a0041b2..6d2b482359 100644 --- a/pkg/vmcp/aggregator/aggregator.go +++ b/pkg/vmcp/aggregator/aggregator.go @@ -61,25 +61,6 @@ type Aggregator interface { // 2. Resolve conflicts // 3. Merge into final view AggregateCapabilities(ctx context.Context, backends []vmcp.Backend) (*AggregatedCapabilities, error) - - // ProcessPreQueriedCapabilities applies the same aggregation pipeline (overrides, - // conflict resolution, advertising filter) to tools that have already been fetched - // from live backends. Used by the session management path to reuse aggregator - // logic without re-querying backends over HTTP. - // - // toolsByBackend maps backend WorkloadID → raw tools as returned by the backend. - // targets maps backend WorkloadID → the pre-built BackendTarget for that backend. - // - // Returns: - // - advertisedTools: resolved tools that pass the advertising filter (for MCP clients) - // - allResolvedTools: all resolved tools including non-advertised ones (for schema lookup) - // - toolsRouting: routing table keyed by resolved name; each entry has OriginalCapabilityName - // set so that GetBackendCapabilityName() translates back to the raw backend name. - ProcessPreQueriedCapabilities( - ctx context.Context, - toolsByBackend map[string][]vmcp.Tool, - targets map[string]*vmcp.BackendTarget, - ) (advertisedTools []vmcp.Tool, allResolvedTools []vmcp.Tool, toolsRouting map[string]*vmcp.BackendTarget, err error) } // BackendCapabilities contains the raw capabilities from a single backend. diff --git a/pkg/vmcp/aggregator/default_aggregator.go b/pkg/vmcp/aggregator/default_aggregator.go index dbebdbe263..42b80bae3c 100644 --- a/pkg/vmcp/aggregator/default_aggregator.go +++ b/pkg/vmcp/aggregator/default_aggregator.go @@ -495,69 +495,6 @@ func (a *defaultAggregator) AggregateCapabilities( return aggregated, nil } -// ProcessPreQueriedCapabilities implements Aggregator.ProcessPreQueriedCapabilities. -// It reuses processBackendTools, ResolveConflicts, and shouldAdvertiseTool so that -// the session path applies identical transforms to the aggregation path. -func (a *defaultAggregator) ProcessPreQueriedCapabilities( - ctx context.Context, - toolsByBackend map[string][]vmcp.Tool, - targets map[string]*vmcp.BackendTarget, -) ([]vmcp.Tool, []vmcp.Tool, map[string]*vmcp.BackendTarget, error) { - // Step 1: Apply per-backend overrides (renames, description changes). - processed := make(map[string]*BackendCapabilities, len(toolsByBackend)) - for backendID, rawTools := range toolsByBackend { - processed[backendID] = &BackendCapabilities{ - BackendID: backendID, - Tools: processBackendTools(ctx, backendID, rawTools, a.toolConfigMap[backendID]), - } - } - - // Step 2: Resolve naming conflicts across backends. - resolved, err := a.ResolveConflicts(ctx, processed) - if err != nil { - return nil, nil, nil, err - } - - // Step 3: Build advertised list, all-resolved list, and routing table. - // advertisedTools is the subset shown to MCP clients (post-filter). - // allResolvedTools includes every resolved tool regardless of advertising filter, - // so that workflow engines can look up InputSchema for type coercion even when - // a backend tool is hidden from clients via excludeAll or filter configuration. - var advertisedTools []vmcp.Tool - var allResolvedTools []vmcp.Tool - routingTable := make(map[string]*vmcp.BackendTarget, len(resolved.Tools)) - - for _, rt := range resolved.Tools { - target, ok := targets[rt.BackendID] - if !ok { - slog.Warn("ProcessPreQueriedCapabilities: no target for backend, skipping tool", - "backend", rt.BackendID, "tool", rt.ResolvedName) - continue - } - // Clone the target and record the actual backend capability name for call routing. - // rt.OriginalName is the post-override name; reverse the override map to get the - // actual name the backend itself uses. - t := *target - t.OriginalCapabilityName = actualBackendCapabilityName(a.toolConfigMap, rt.BackendID, rt.OriginalName) - routingTable[rt.ResolvedName] = &t - - resolved := vmcp.Tool{ - Name: rt.ResolvedName, - Description: rt.Description, - InputSchema: rt.InputSchema, - OutputSchema: rt.OutputSchema, - Annotations: rt.Annotations, - BackendID: rt.BackendID, - } - allResolvedTools = append(allResolvedTools, resolved) - if a.shouldAdvertiseTool(rt.BackendID, rt.OriginalName) { - advertisedTools = append(advertisedTools, resolved) - } - } - - return advertisedTools, allResolvedTools, routingTable, nil -} - // actualBackendCapabilityName returns the real capability name the backend uses, // reversing any per-backend override rename that processBackendTools may have applied. // diff --git a/pkg/vmcp/aggregator/default_aggregator_test.go b/pkg/vmcp/aggregator/default_aggregator_test.go index 97600b27d4..1507327713 100644 --- a/pkg/vmcp/aggregator/default_aggregator_test.go +++ b/pkg/vmcp/aggregator/default_aggregator_test.go @@ -767,288 +767,3 @@ func TestDefaultAggregator_FilterPreservesRoutingTableForCompositeTools(t *testi assert.Len(t, result.RoutingTable.Tools, 2) }) } - -func TestDefaultAggregator_ProcessPreQueriedCapabilities(t *testing.T) { - t.Parallel() - - // newTarget is a helper that builds a minimal BackendTarget for a given backend. - newTarget := func(backendID string) *vmcp.BackendTarget { - return &vmcp.BackendTarget{ - WorkloadID: backendID, - WorkloadName: backendID + "-name", - BaseURL: "http://" + backendID + ":8080", - TransportType: "streamable-http", - HealthStatus: vmcp.BackendHealthy, - } - } - - t.Run("happy path: tools appear in both advertised list and routing table", func(t *testing.T) { - t.Parallel() - - toolsByBackend := map[string][]vmcp.Tool{ - "backend1": {newTestTool("tool1", "backend1")}, - "backend2": {newTestTool("tool2", "backend2")}, - } - targets := map[string]*vmcp.BackendTarget{ - "backend1": newTarget("backend1"), - "backend2": newTarget("backend2"), - } - - agg := NewDefaultAggregator(nil, nil, nil, nil) - advertised, allResolved, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), toolsByBackend, targets, - ) - - require.NoError(t, err) - // Both tools must be advertised. - advertisedNames := make([]string, 0, len(advertised)) - for _, t := range advertised { - advertisedNames = append(advertisedNames, t.Name) - } - assert.Contains(t, advertisedNames, "tool1") - assert.Contains(t, advertisedNames, "tool2") - // With no filter, allResolved must equal the advertised list. - assert.ElementsMatch(t, advertised, allResolved, - "without a filter, allResolvedTools must equal the advertised list") - // Both tools must be in the routing table. - assert.Contains(t, routingTable, "tool1") - assert.Contains(t, routingTable, "tool2") - assert.Equal(t, "backend1", routingTable["tool1"].WorkloadID) - assert.Equal(t, "backend2", routingTable["tool2"].WorkloadID) - }) - - t.Run("OriginalCapabilityName is set in routing table entries", func(t *testing.T) { - t.Parallel() - - toolsByBackend := map[string][]vmcp.Tool{ - "backend1": {newTestTool("my_tool", "backend1")}, - } - targets := map[string]*vmcp.BackendTarget{ - "backend1": newTarget("backend1"), - } - - agg := NewDefaultAggregator(nil, nil, nil, nil) - _, _, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), toolsByBackend, targets, - ) - - require.NoError(t, err) - require.Contains(t, routingTable, "my_tool") - // OriginalCapabilityName must be set so GetBackendCapabilityName() works correctly. - assert.Equal(t, "my_tool", routingTable["my_tool"].OriginalCapabilityName, - "OriginalCapabilityName must be wired to the original backend tool name") - }) - - t.Run("override rename: routing table keyed by overridden name with OriginalCapabilityName set", func(t *testing.T) { - t.Parallel() - - aggCfg := &config.AggregationConfig{ - Tools: []*config.WorkloadToolConfig{ - { - Workload: "backend1", - Overrides: map[string]*config.ToolOverride{ - "raw_tool": {Name: "fancy_tool"}, - }, - }, - }, - } - - toolsByBackend := map[string][]vmcp.Tool{ - "backend1": {{Name: "raw_tool", Description: "raw", BackendID: "backend1"}}, - } - targets := map[string]*vmcp.BackendTarget{ - "backend1": newTarget("backend1"), - } - - agg := NewDefaultAggregator(nil, nil, aggCfg, nil) - advertised, _, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), toolsByBackend, targets, - ) - - require.NoError(t, err) - // Routing table must use the overridden name as the key. - require.Contains(t, routingTable, "fancy_tool", - "routing table should be keyed by the overridden tool name") - assert.NotContains(t, routingTable, "raw_tool", - "pre-override name should not appear as a routing table key") - // OriginalCapabilityName must be the actual backend name (pre-override) so that - // GetBackendCapabilityName translates the resolved name back to what the backend - // actually exposes. Forwarding the overridden user-visible name ("fancy_tool") - // would cause the backend call to fail. - assert.Equal(t, "raw_tool", routingTable["fancy_tool"].OriginalCapabilityName, - "OriginalCapabilityName must be the actual backend capability name, not the overridden name") - // Advertised list must also use the overridden name. - require.Len(t, advertised, 1) - assert.Equal(t, "fancy_tool", advertised[0].Name) - }) - - t.Run("conflict resolution: one tool wins when two backends share a name", func(t *testing.T) { - t.Parallel() - - toolsByBackend := map[string][]vmcp.Tool{ - "backend1": {newTestTool("shared", "backend1")}, - "backend2": {newTestTool("shared", "backend2")}, - } - targets := map[string]*vmcp.BackendTarget{ - "backend1": newTarget("backend1"), - "backend2": newTarget("backend2"), - } - - agg := NewDefaultAggregator(nil, nil, nil, nil) - advertised, _, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), toolsByBackend, targets, - ) - - require.NoError(t, err) - // Default resolver: one backend wins; the key appears exactly once. - assert.Contains(t, routingTable, "shared", - "shared tool must still be in the routing table") - winnerBackend := routingTable["shared"].WorkloadID - assert.True(t, winnerBackend == "backend1" || winnerBackend == "backend2", - "winning backend must be either backend1 or backend2, got: %s", winnerBackend) - // Exactly one advertised entry for the shared name. - count := 0 - for _, tool := range advertised { - if tool.Name == "shared" { - count++ - } - } - assert.Equal(t, 1, count, "shared tool should appear exactly once in the advertised list") - }) - - t.Run("global ExcludeAllTools: routing table populated, advertised list empty", func(t *testing.T) { - t.Parallel() - - aggCfg := &config.AggregationConfig{ExcludeAllTools: true} - - toolsByBackend := map[string][]vmcp.Tool{ - "backend1": {newTestTool("tool1", "backend1"), newTestTool("tool2", "backend1")}, - } - targets := map[string]*vmcp.BackendTarget{ - "backend1": newTarget("backend1"), - } - - agg := NewDefaultAggregator(nil, nil, aggCfg, nil) - advertised, allResolved, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), toolsByBackend, targets, - ) - - require.NoError(t, err) - assert.Empty(t, advertised, - "ExcludeAllTools must produce an empty advertised list") - // allResolvedTools must contain all tools regardless of the advertising filter, - // so the workflow engine can look up InputSchema for type coercion. - allResolvedNames := make([]string, 0, len(allResolved)) - for _, tool := range allResolved { - allResolvedNames = append(allResolvedNames, tool.Name) - } - assert.Contains(t, allResolvedNames, "tool1", - "excluded tools must appear in allResolvedTools for composite tool schema lookup") - assert.Contains(t, allResolvedNames, "tool2", - "excluded tools must appear in allResolvedTools for composite tool schema lookup") - // Tools must still be routable (composite tools need them). - assert.Contains(t, routingTable, "tool1", - "excluded tools must remain in the routing table for composite tool use") - assert.Contains(t, routingTable, "tool2", - "excluded tools must remain in the routing table for composite tool use") - }) - - t.Run("per-workload filter: matching tools advertised, non-matching tools routing-table-only", func(t *testing.T) { - t.Parallel() - - aggCfg := &config.AggregationConfig{ - Tools: []*config.WorkloadToolConfig{ - { - Workload: "backend1", - Filter: []string{"allowed_tool"}, - }, - }, - } - - toolsByBackend := map[string][]vmcp.Tool{ - "backend1": { - newTestTool("allowed_tool", "backend1"), - newTestTool("hidden_tool", "backend1"), - }, - } - targets := map[string]*vmcp.BackendTarget{ - "backend1": newTarget("backend1"), - } - - agg := NewDefaultAggregator(nil, nil, aggCfg, nil) - advertised, allResolved, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), toolsByBackend, targets, - ) - - require.NoError(t, err) - // Only the allowed tool is advertised. - advertisedNames := make([]string, 0, len(advertised)) - for _, tool := range advertised { - advertisedNames = append(advertisedNames, tool.Name) - } - assert.Equal(t, []string{"allowed_tool"}, advertisedNames, - "only tools matching the filter should be advertised") - // allResolvedTools must include both tools so the workflow engine can - // look up InputSchema for type coercion on hidden_tool. - allResolvedNames := make([]string, 0, len(allResolved)) - for _, tool := range allResolved { - allResolvedNames = append(allResolvedNames, tool.Name) - } - assert.Contains(t, allResolvedNames, "allowed_tool", - "filtered-in tool must appear in allResolvedTools") - assert.Contains(t, allResolvedNames, "hidden_tool", - "filtered-out tool must appear in allResolvedTools for composite tool schema lookup") - // Both tools remain routable (composite tools can call hidden_tool). - assert.Contains(t, routingTable, "allowed_tool", - "filtered-in tool should be in routing table") - assert.Contains(t, routingTable, "hidden_tool", - "filtered-out tool must still be in routing table for composite tool use") - }) - - t.Run("missing target: tool skipped when backend has no entry in targets map", func(t *testing.T) { - t.Parallel() - - toolsByBackend := map[string][]vmcp.Tool{ - "backend1": {newTestTool("tool1", "backend1")}, - "backend2": {newTestTool("tool2", "backend2")}, // no matching target - } - targets := map[string]*vmcp.BackendTarget{ - "backend1": newTarget("backend1"), - // backend2 intentionally omitted - } - - agg := NewDefaultAggregator(nil, nil, nil, nil) - advertised, _, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), toolsByBackend, targets, - ) - - require.NoError(t, err) - // backend1's tool is present in both lists. - assert.Contains(t, routingTable, "tool1") - advertisedNames := make([]string, 0, len(advertised)) - for _, tool := range advertised { - advertisedNames = append(advertisedNames, tool.Name) - } - assert.Contains(t, advertisedNames, "tool1") - // backend2's tool is absent because no target was provided. - assert.NotContains(t, routingTable, "tool2", - "tool from backend with no target must be absent from routing table") - assert.NotContains(t, advertisedNames, "tool2", - "tool from backend with no target must be absent from advertised list") - }) - - t.Run("empty input: returns empty results without error", func(t *testing.T) { - t.Parallel() - - agg := NewDefaultAggregator(nil, nil, nil, nil) - advertised, _, routingTable, err := agg.ProcessPreQueriedCapabilities( - context.Background(), - map[string][]vmcp.Tool{}, - map[string]*vmcp.BackendTarget{}, - ) - - require.NoError(t, err) - assert.Empty(t, advertised) - assert.Empty(t, routingTable) - }) -} diff --git a/pkg/vmcp/aggregator/mocks/mock_interfaces.go b/pkg/vmcp/aggregator/mocks/mock_interfaces.go index 621cc9ce56..c90fe21f84 100644 --- a/pkg/vmcp/aggregator/mocks/mock_interfaces.go +++ b/pkg/vmcp/aggregator/mocks/mock_interfaces.go @@ -111,23 +111,6 @@ func (mr *MockAggregatorMockRecorder) MergeCapabilities(ctx, resolved, registry return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MergeCapabilities", reflect.TypeOf((*MockAggregator)(nil).MergeCapabilities), ctx, resolved, registry) } -// ProcessPreQueriedCapabilities mocks base method. -func (m *MockAggregator) ProcessPreQueriedCapabilities(ctx context.Context, toolsByBackend map[string][]vmcp.Tool, targets map[string]*vmcp.BackendTarget) ([]vmcp.Tool, []vmcp.Tool, map[string]*vmcp.BackendTarget, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessPreQueriedCapabilities", ctx, toolsByBackend, targets) - ret0, _ := ret[0].([]vmcp.Tool) - ret1, _ := ret[1].([]vmcp.Tool) - ret2, _ := ret[2].(map[string]*vmcp.BackendTarget) - ret3, _ := ret[3].(error) - return ret0, ret1, ret2, ret3 -} - -// ProcessPreQueriedCapabilities indicates an expected call of ProcessPreQueriedCapabilities. -func (mr *MockAggregatorMockRecorder) ProcessPreQueriedCapabilities(ctx, toolsByBackend, targets any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessPreQueriedCapabilities", reflect.TypeOf((*MockAggregator)(nil).ProcessPreQueriedCapabilities), ctx, toolsByBackend, targets) -} - // QueryAllCapabilities mocks base method. func (m *MockAggregator) QueryAllCapabilities(ctx context.Context, backends []vmcp.Backend) (map[string]*aggregator.BackendCapabilities, error) { m.ctrl.T.Helper() diff --git a/pkg/vmcp/cli/serve.go b/pkg/vmcp/cli/serve.go index 49290020f8..082e24e7b7 100644 --- a/pkg/vmcp/cli/serve.go +++ b/pkg/vmcp/cli/serve.go @@ -350,7 +350,9 @@ func Serve(ctx context.Context, cfg ServeConfig) error { slog.Debug("VMCP_SESSION_HMAC_SECRET is set but no longer used after #5306; ignoring", "env_var", "VMCP_SESSION_HMAC_SECRET") } - sessionFactory := createSessionFactory(outgoingRegistry, agg) + // The factory never aggregates — the core is the single source of capability + // aggregation (agg feeds it via Config.Aggregator below). + sessionFactory := vmcpsession.NewSessionFactory(outgoingRegistry) // When the optimizer is enabled, its meta-tools must pass through the authz // response filter so they appear in tools/list. @@ -698,17 +700,3 @@ func runDiscovery( slog.Info(fmt.Sprintf("Discovered %d backends", len(backends))) return backends, backendClient, outgoingRegistry, nil } - -// createSessionFactory creates a MultiSessionFactory backed by the provided outgoing -// auth registry and optional aggregator. When agg is non-nil, sessions gain access -// to aggregated backend metadata; pass nil for single-backend deployments. -func createSessionFactory( - outgoingRegistry vmcpauth.OutgoingAuthRegistry, - agg aggregator.Aggregator, -) vmcpsession.MultiSessionFactory { - var opts []vmcpsession.MultiSessionFactoryOption - if agg != nil { - opts = append(opts, vmcpsession.WithAggregator(agg)) - } - return vmcpsession.NewSessionFactory(outgoingRegistry, opts...) -} diff --git a/pkg/vmcp/cli/serve_test.go b/pkg/vmcp/cli/serve_test.go index a4449e57c3..88e05312cd 100644 --- a/pkg/vmcp/cli/serve_test.go +++ b/pkg/vmcp/cli/serve_test.go @@ -17,7 +17,6 @@ import ( authserverconfig "github.com/stacklok/toolhive/pkg/authserver" "github.com/stacklok/toolhive/pkg/groups" "github.com/stacklok/toolhive/pkg/vmcp" - "github.com/stacklok/toolhive/pkg/vmcp/aggregator" aggregatormocks "github.com/stacklok/toolhive/pkg/vmcp/aggregator/mocks" clientmocks "github.com/stacklok/toolhive/pkg/vmcp/client/mocks" "github.com/stacklok/toolhive/pkg/vmcp/config" @@ -160,35 +159,6 @@ backends: assert.Len(t, backends, 1) } -func newSessionFactoryMocks(t *testing.T) (*clientmocks.MockOutgoingAuthRegistry, *aggregatormocks.MockAggregator) { - t.Helper() - ctrl := gomock.NewController(t) - return clientmocks.NewMockOutgoingAuthRegistry(ctrl), aggregatormocks.NewMockAggregator(ctrl) -} - -func TestCreateSessionFactory(t *testing.T) { - t.Parallel() - tests := []struct { - name string - useAgg bool - }{ - {name: "with aggregator", useAgg: true}, - {name: "without aggregator", useAgg: false}, - } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - registry, agg := newSessionFactoryMocks(t) - var aggArg aggregator.Aggregator - if tc.useAgg { - aggArg = agg - } - factory := createSessionFactory(registry, aggArg) - require.NotNil(t, factory) - }) - } -} - // TestRunDiscovery_KubernetesGroupNotFound exercises the Kubernetes-specific branch // in runDiscovery where ErrGroupNotFound is treated as a non-fatal condition. // vMCP should start with zero backends and return nil error so it can begin diff --git a/pkg/vmcp/session/factory.go b/pkg/vmcp/session/factory.go index a7ca9803f2..e7f4332d10 100644 --- a/pkg/vmcp/session/factory.go +++ b/pkg/vmcp/session/factory.go @@ -17,7 +17,6 @@ import ( "github.com/stacklok/toolhive/pkg/auth" transportsession "github.com/stacklok/toolhive/pkg/transport/session" "github.com/stacklok/toolhive/pkg/vmcp" - "github.com/stacklok/toolhive/pkg/vmcp/aggregator" vmcpauth "github.com/stacklok/toolhive/pkg/vmcp/auth" "github.com/stacklok/toolhive/pkg/vmcp/session/binding" "github.com/stacklok/toolhive/pkg/vmcp/session/internal/backend" @@ -115,7 +114,6 @@ type defaultMultiSessionFactory struct { connector backendConnector maxConcurrency int backendInitTimeout time.Duration - aggregator aggregator.Aggregator // Optional: applies tool transforms (overrides, conflict resolution, filter) } // MultiSessionFactoryOption configures a defaultMultiSessionFactory. @@ -141,15 +139,6 @@ func WithBackendInitTimeout(d time.Duration) MultiSessionFactoryOption { } } -// WithAggregator configures the factory to apply per-backend tool overrides, -// conflict resolution, and advertising filters when building sessions. -// If not set, raw backend tool names are used unchanged. -func WithAggregator(agg aggregator.Aggregator) MultiSessionFactoryOption { - return func(f *defaultMultiSessionFactory) { - f.aggregator = agg - } -} - // NewSessionFactory creates a MultiSessionFactory that connects to backends // over HTTP using the given outgoing auth registry. func NewSessionFactory(registry vmcpauth.OutgoingAuthRegistry, opts ...MultiSessionFactoryOption) MultiSessionFactory { @@ -254,57 +243,6 @@ func buildRoutingTable(results []initResult) (*vmcp.RoutingTable, []vmcp.Tool, [ return rt, tools, resources, prompts } -// buildRoutingTableWithAggregator applies the aggregator's full transformation -// pipeline (overrides, conflict resolution, advertising filter) to the raw -// backend capabilities in results, producing resolved tool names identical to -// the standard aggregation path. Resources and prompts pass through unchanged. -// -// Returns the routing table, advertised tools (for MCP clients), all resolved -// tools (for schema lookup), resources, prompts, and any error. -func buildRoutingTableWithAggregator( - ctx context.Context, - agg aggregator.Aggregator, - results []initResult, -) (*vmcp.RoutingTable, []vmcp.Tool, []vmcp.Tool, []vmcp.Resource, []vmcp.Prompt, error) { - toolsByBackend := make(map[string][]vmcp.Tool, len(results)) - targets := make(map[string]*vmcp.BackendTarget, len(results)) - for i := range results { - r := &results[i] - toolsByBackend[r.target.WorkloadID] = r.caps.Tools - targets[r.target.WorkloadID] = r.target - } - - advertisedTools, allResolvedTools, toolsRouting, err := agg.ProcessPreQueriedCapabilities(ctx, toolsByBackend, targets) - if err != nil { - return nil, nil, nil, nil, nil, err - } - - rt := &vmcp.RoutingTable{ - Tools: toolsRouting, - Resources: make(map[string]*vmcp.BackendTarget), - Prompts: make(map[string]*vmcp.BackendTarget), - } - - var allResources []vmcp.Resource - var allPrompts []vmcp.Prompt - for _, r := range results { - for _, res := range r.caps.Resources { - if _, ok := rt.Resources[res.URI]; !ok { - allResources = append(allResources, res) - rt.Resources[res.URI] = r.target - } - } - for _, prompt := range r.caps.Prompts { - if _, ok := rt.Prompts[prompt.Name]; !ok { - allPrompts = append(allPrompts, prompt) - rt.Prompts[prompt.Name] = r.target - } - } - } - - return rt, advertisedTools, allResolvedTools, allResources, allPrompts, nil -} - // MakeSessionWithID implements MultiSessionFactory. func (f *defaultMultiSessionFactory) MakeSessionWithID( ctx context.Context, @@ -364,7 +302,7 @@ func (f *defaultMultiSessionFactory) makeBaseSession( identity *auth.Identity, backends []*vmcp.Backend, sessionHints map[string]string, -) (*defaultMultiSession, error) { +) *defaultMultiSession { filtered := make([]*vmcp.Backend, 0, len(backends)) for _, b := range backends { if b == nil { @@ -409,24 +347,11 @@ func (f *defaultMultiSessionFactory) makeBaseSession( "backendCount", len(backends)) } - var ( - routingTable *vmcp.RoutingTable - advertisedTools []vmcp.Tool - allResolvedTools []vmcp.Tool - allResources []vmcp.Resource - allPrompts []vmcp.Prompt - ) - if f.aggregator != nil { - var aggErr error - routingTable, advertisedTools, allResolvedTools, allResources, allPrompts, aggErr = - buildRoutingTableWithAggregator(ctx, f.aggregator, results) - if aggErr != nil { - return nil, fmt.Errorf("failed to process backend capabilities: %w", aggErr) - } - } else { - routingTable, advertisedTools, allResources, allPrompts = buildRoutingTable(results) - allResolvedTools = advertisedTools // no filter when no aggregator - } + // The core is the single source of capability aggregation/advertising (the factory never + // aggregates), so the routing table is built from the raw backend capabilities with no + // overrides/conflict-resolution/filter; advertised and resolved tools are identical. + routingTable, advertisedTools, allResources, allPrompts := buildRoutingTable(results) + allResolvedTools := advertisedTools transportSess := transportsession.NewStreamableSession(sessID) populateBackendMetadata(transportSess, results) @@ -441,7 +366,7 @@ func (f *defaultMultiSessionFactory) makeBaseSession( prompts: allPrompts, backendSessions: backendSessions, queue: newAdmissionQueue(), - }, nil + } } // makeSession is the shared implementation for MakeSession and MakeSessionWithID. @@ -453,10 +378,7 @@ func (f *defaultMultiSessionFactory) makeSession( identity *auth.Identity, backends []*vmcp.Backend, ) (MultiSession, error) { - baseSession, err := f.makeBaseSession(ctx, sessID, identity, backends, nil) - if err != nil { - return nil, err - } + baseSession := f.makeBaseSession(ctx, sessID, identity, backends, nil) // Apply session binding: extracts the (iss, sub) identity tuple, stores it in // session metadata under MetadataKeyIdentityBinding, and wraps the session with @@ -549,10 +471,7 @@ func (f *defaultMultiSessionFactory) RestoreSession( // Build the base session (backend connections + routing table) without the // security wrapper. The wrapper is applied separately below. - baseSession, err := f.makeBaseSession(ctx, id, identity, filteredBackends, sessionHints) - if err != nil { - return nil, fmt.Errorf("RestoreSession: failed to rebuild backend connections: %w", err) - } + baseSession := f.makeBaseSession(ctx, id, identity, filteredBackends, sessionHints) // Restore only the identity-binding key from stored metadata. The other // keys (MetadataKeyBackendIDs, MetadataKeyBackendSessionPrefix.*) are From 549caf747f4709d865d74503a9dbdd66ea8a3e5d Mon Sep 17 00:00:00 2001 From: Trey Date: Wed, 24 Jun 2026 10:48:34 -0700 Subject: [PATCH 2/2] Remove session-factory composite/telemetry mirrors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The core owns composite-tool execution (executeComposite) and workflow telemetry (core_telemetry.go's telemetryComposer, #5561), and nothing sets FactoryConfig.WorkflowDefs/ ComposerFactory on the Serve path (server.New and the integration harness leave them unset), so the session factory's composite-tool decorator and its workflow-executor telemetry never run. Remove them — the second half of #5621 Stage 2: - sessionmanager/factory.go: delete compositeToolsDecorator, composerWorkflowExecutor, workflowExecutorInstruments + newWorkflowExecutorInstruments + wrapExecutor, and telemetryWorkflowExecutor; drop the instruments param from buildDecoratingFactory. The optimizer decorator and its telemetry (monitorOptimizer/telemetryOptimizer) are untouched. - FactoryConfig: drop the dead WorkflowDefs + ComposerFactory fields. - session_manager.go: drop the workflow-instruments setup and the now-dead "ComposerFactory required when WorkflowDefs set" validation. Part of #5621. Co-Authored-By: Claude Opus 4.8 (1M context) --- pkg/vmcp/server/sessionmanager/factory.go | 173 +----------------- .../server/sessionmanager/session_manager.go | 22 +-- 2 files changed, 2 insertions(+), 193 deletions(-) diff --git a/pkg/vmcp/server/sessionmanager/factory.go b/pkg/vmcp/server/sessionmanager/factory.go index 1bd2c25f1d..fb4bb159c7 100644 --- a/pkg/vmcp/server/sessionmanager/factory.go +++ b/pkg/vmcp/server/sessionmanager/factory.go @@ -21,9 +21,7 @@ import ( "github.com/stacklok/toolhive/pkg/auth" "github.com/stacklok/toolhive/pkg/telemetry" "github.com/stacklok/toolhive/pkg/vmcp" - "github.com/stacklok/toolhive/pkg/vmcp/composer" "github.com/stacklok/toolhive/pkg/vmcp/conversion" - "github.com/stacklok/toolhive/pkg/vmcp/internal/compositetools" "github.com/stacklok/toolhive/pkg/vmcp/optimizer" vmcpsession "github.com/stacklok/toolhive/pkg/vmcp/session" "github.com/stacklok/toolhive/pkg/vmcp/session/optimizerdec" @@ -45,14 +43,6 @@ type FactoryConfig struct { // Base is the underlying session factory. Required. Base vmcpsession.MultiSessionFactory - // WorkflowDefs are the composite tool workflow definitions. - // If empty, composite tool decoration is skipped. - WorkflowDefs map[string]*composer.WorkflowDefinition - - // ComposerFactory builds a per-session composer bound to the session's - // routing table and tool list. - ComposerFactory func(rt *vmcp.RoutingTable, tools []vmcp.Tool) composer.Composer - // OptimizerConfig is optional optimizer configuration. // When non-nil and OptimizerFactory is nil, New() creates the optimizer // factory from this config and returns a cleanup function. @@ -150,17 +140,14 @@ func resolveOptimizer(cfg *FactoryConfig) ( func buildDecoratingFactory( cfg *FactoryConfig, optimizerFactory func(context.Context, []mcpserver.ServerTool) (optimizer.Optimizer, error), - instruments *workflowExecutorInstruments, terminateSession func(string) (bool, error), ) vmcpsession.MultiSessionFactory { var decorators []vmcpsession.Decorator - if len(cfg.WorkflowDefs) > 0 { - decorators = append(decorators, compositeToolsDecorator(cfg.WorkflowDefs, cfg.ComposerFactory, instruments)) - } // On the Serve path (AdvertiseFromCore) the optimizer is built by the Serve layer // over the core's advertised set, so the factory's optimizer decorator is skipped // to avoid double-indexing the shared store (see FactoryConfig.AdvertiseFromCore). + // Composite tools and their telemetry are owned by the core, not the factory. if optimizerFactory != nil && !cfg.AdvertiseFromCore { decorators = append(decorators, optimizerDecoratorFn(optimizerFactory, terminateSession)) } @@ -168,39 +155,6 @@ func buildDecoratingFactory( return vmcpsession.NewDecoratingFactory(cfg.Base, decorators...) } -// compositeToolsDecorator returns a Decorator that applies the composite tools -// wrapper to newly created sessions. -func compositeToolsDecorator( - workflowDefs map[string]*composer.WorkflowDefinition, - composerFactory func(rt *vmcp.RoutingTable, tools []vmcp.Tool) composer.Composer, - instruments *workflowExecutorInstruments, -) vmcpsession.Decorator { - return func(_ context.Context, sess vmcpsession.MultiSession) (vmcpsession.MultiSession, error) { - sessionDefs := compositetools.FilterWorkflowDefsForSession(workflowDefs, sess.GetRoutingTable()) - if len(sessionDefs) == 0 { - return sess, nil - } - - compositeToolsMeta := compositetools.ConvertWorkflowDefsToTools(sessionDefs) - if err := compositetools.ValidateNoToolConflicts(sess.AllTools(), compositeToolsMeta); err != nil { - slog.Warn("composite tool name conflict detected; skipping composite tools", "session_id", sess.ID(), "error", err) - return sess, nil - } - - sessionComposer := composerFactory(sess.GetRoutingTable(), sess.AllTools()) - sessionExecutors := make(map[string]compositetools.WorkflowExecutor, len(sessionDefs)) - for _, def := range sessionDefs { - ex := newComposerWorkflowExecutor(sessionComposer, def) - if instruments != nil { - ex = instruments.wrapExecutor(def.Name, ex) - } - sessionExecutors[def.Name] = ex - } - - return compositetools.NewDecorator(sess, compositeToolsMeta, sessionExecutors), nil - } -} - // optimizerDecoratorFn returns a Decorator that indexes all session tools into // the optimizer and replaces the tool list with find_tool + call_tool. func optimizerDecoratorFn( @@ -300,131 +254,6 @@ func adaptToolsForFactory( return sdkTools, nil } -// composerWorkflowExecutor adapts a composer.Composer + WorkflowDefinition -// to the compositetools.WorkflowExecutor interface. -type composerWorkflowExecutor struct { - composer composer.Composer - def *composer.WorkflowDefinition -} - -func newComposerWorkflowExecutor(c composer.Composer, def *composer.WorkflowDefinition) compositetools.WorkflowExecutor { - return &composerWorkflowExecutor{composer: c, def: def} -} - -func (e *composerWorkflowExecutor) ExecuteWorkflow( - ctx context.Context, params map[string]any, -) (*compositetools.WorkflowResult, error) { - result, err := e.composer.ExecuteWorkflow(ctx, e.def, params) - if err != nil { - return nil, err - } - return &compositetools.WorkflowResult{ - Output: result.Output, - Error: result.Error, - }, nil -} - -// workflowExecutorInstruments holds pre-created OTEL instruments for workflow -// telemetry. Created once at startup and reused across all session registrations. -type workflowExecutorInstruments struct { - tracer trace.Tracer - executionsTotal metric.Int64Counter - errorsTotal metric.Int64Counter - executionDuration metric.Float64Histogram -} - -func newWorkflowExecutorInstruments( - meterProvider metric.MeterProvider, - tracerProvider trace.TracerProvider, -) (*workflowExecutorInstruments, error) { - meter := meterProvider.Meter(instrumentationName) - - executionsTotal, err := meter.Int64Counter( - "toolhive_vmcp_workflow_executions", - metric.WithDescription("Total number of workflow executions"), - ) - if err != nil { - return nil, fmt.Errorf("failed to create workflow executions counter: %w", err) - } - - errorsTotal, err := meter.Int64Counter( - "toolhive_vmcp_workflow_errors", - metric.WithDescription("Total number of workflow execution errors"), - ) - if err != nil { - return nil, fmt.Errorf("failed to create workflow errors counter: %w", err) - } - - executionDuration, err := meter.Float64Histogram( - "toolhive_vmcp_workflow_duration", - metric.WithDescription("Duration of workflow executions in seconds"), - metric.WithUnit("s"), - metric.WithExplicitBucketBoundaries(telemetry.MCPHistogramBuckets...), - ) - if err != nil { - return nil, fmt.Errorf("failed to create workflow duration histogram: %w", err) - } - - return &workflowExecutorInstruments{ - tracer: tracerProvider.Tracer(instrumentationName), - executionsTotal: executionsTotal, - errorsTotal: errorsTotal, - executionDuration: executionDuration, - }, nil -} - -func (i *workflowExecutorInstruments) wrapExecutor( - name string, ex compositetools.WorkflowExecutor, -) compositetools.WorkflowExecutor { - return &telemetryWorkflowExecutor{ - name: name, - executor: ex, - tracer: i.tracer, - executionsTotal: i.executionsTotal, - errorsTotal: i.errorsTotal, - executionDuration: i.executionDuration, - } -} - -type telemetryWorkflowExecutor struct { - name string - executor compositetools.WorkflowExecutor - tracer trace.Tracer - executionsTotal metric.Int64Counter - errorsTotal metric.Int64Counter - executionDuration metric.Float64Histogram -} - -var _ compositetools.WorkflowExecutor = (*telemetryWorkflowExecutor)(nil) - -func (t *telemetryWorkflowExecutor) ExecuteWorkflow( - ctx context.Context, params map[string]any, -) (*compositetools.WorkflowResult, error) { - commonAttrs := []attribute.KeyValue{attribute.String("workflow.name", t.name)} - - ctx, span := t.tracer.Start(ctx, "telemetryWorkflowExecutor.ExecuteWorkflow", - trace.WithAttributes(commonAttrs...), - ) - defer span.End() - - metricAttrs := metric.WithAttributes(commonAttrs...) - start := time.Now() - t.executionsTotal.Add(ctx, 1, metricAttrs) - - result, err := t.executor.ExecuteWorkflow(ctx, params) - - duration := time.Since(start) - t.executionDuration.Record(ctx, duration.Seconds(), metricAttrs) - - if err != nil { - t.errorsTotal.Add(ctx, 1, metricAttrs) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - - return result, err -} - // monitorOptimizer wraps an optimizer factory so that every Optimizer instance // produced by it is decorated with telemetry (metrics + traces). func monitorOptimizer( diff --git a/pkg/vmcp/server/sessionmanager/session_manager.go b/pkg/vmcp/server/sessionmanager/session_manager.go index c6175cff77..da776582f4 100644 --- a/pkg/vmcp/server/sessionmanager/session_manager.go +++ b/pkg/vmcp/server/sessionmanager/session_manager.go @@ -113,32 +113,12 @@ func New( if capacity == 0 { capacity = defaultCacheCapacity } - if len(cfg.WorkflowDefs) > 0 && cfg.ComposerFactory == nil { - return nil, nil, fmt.Errorf("sessionmanager.New: ComposerFactory is required when WorkflowDefs are provided") - } - // Resolve optimizer factory from config, applying telemetry wrapping if needed. optimizerFactory, optimizerCleanup, err := resolveOptimizer(cfg) if err != nil { return nil, nil, err } - // Pre-create workflow telemetry instruments once so they are reused across - // all per-session executor wrappers without re-registering metrics. - var instruments *workflowExecutorInstruments - if cfg.TelemetryProvider != nil && len(cfg.WorkflowDefs) > 0 { - instruments, err = newWorkflowExecutorInstruments( - cfg.TelemetryProvider.MeterProvider(), - cfg.TelemetryProvider.TracerProvider(), - ) - if err != nil { - if cleanupErr := optimizerCleanup(context.Background()); cleanupErr != nil { - slog.Warn("failed to clean up optimizer after instrument creation error", "error", cleanupErr) - } - return nil, nil, fmt.Errorf("failed to create workflow executor telemetry: %w", err) - } - } - // Build the Manager first so we can reference sm.Terminate and sm.sessions // directly in closures, eliminating the forward-reference variable pattern. sm := &Manager{ @@ -172,7 +152,7 @@ func New( }, ) - sm.factory = buildDecoratingFactory(cfg, optimizerFactory, instruments, sm.Terminate) + sm.factory = buildDecoratingFactory(cfg, optimizerFactory, sm.Terminate) cleanup := func(ctx context.Context) error { return optimizerCleanup(ctx)