Skip to content

Akka.Streams: end-to-end OpenTelemetry trace context propagation#8160

Draft
Aaronontheweb wants to merge 11 commits intoakkadotnet:devfrom
Aaronontheweb:phobos-streams-trace-context
Draft

Akka.Streams: end-to-end OpenTelemetry trace context propagation#8160
Aaronontheweb wants to merge 11 commits intoakkadotnet:devfrom
Aaronontheweb:phobos-streams-trace-context

Conversation

@Aaronontheweb
Copy link
Copy Markdown
Member

@Aaronontheweb Aaronontheweb commented Apr 11, 2026

Problem

AsyncLocal<Activity> does not flow across the stream actor's dispatcher boundary, so any trace context alive on a producer thread is lost by the time downstream stage handlers run on the interpreter dispatcher. Anything the user's code creates inside a SelectAsync lambda — SqlClient.ExecuteAsync, HttpClient.SendAsync, or any other instrumented call — becomes an orphaned root span with a random TraceId, disconnected from the request that triggered it.

A secondary, related problem: when a fan-in stage like BatchWeighted or GroupedWithin merges N independent input elements into a single output, any trace continuity information from the other N-1 contributing elements is lost. The downstream span ends up attached to whichever input happened to trigger the flush, and every other contributing request becomes an orphaned root.

Approach

Same shape as #7995 (LogEvent.ActivityContext capture for log-trace correlation across mailbox boundaries):

  1. Capture on the producer side, before the boundary hop, while Activity.Current is still alive.
  2. Carry in a framework-owned field on the in-flight element / queued event.
  3. Restore on the consumer side before invoking the user's stage handler.

For fan-in, the canonical OpenTelemetry answer is span links: pick one primary parent (first-wins) and attach the remaining N-1 inputs as ActivityLinks on the flushed stage span, so trace viewers can jump between contributing traces.

Commits

1. Akka.Streams: per-element ActivityContext carry through GraphInterpreter

  • New framework-owned ActivitySource("Akka.Streams"). Users register via .AddSource("Akka.Streams") on their OTel TracerProvider (same pattern as Microsoft.EntityFrameworkCore).
  • Connection.SlotContext parallels Connection.Slot and carries the ActivityContext of the element in flight on that connection.
  • GraphStageLogic.Push captures Activity.Current?.Context into SlotContext after writing the slot; Grab clears it alongside.
  • GraphInterpreter.ProcessPush starts an akka.stream.stage {StageName} Activity with the slot context as parent before invoking OnPush. Disposes in finally. Mirrors the existing previousInterpreter try/finally restore pattern.

2. Akka.Streams: generalize trace ingress capture via shared callback primitive

  • GraphStageLogicWithCallbackWrapper.NotInitialized.Args carries the producer's ActivityContext alongside each queued arg, captured at InvokeCallbacks time. InitCallback drains queued args inside a restore scope so the callback runs with the right ambient Activity.Current even if it was queued before PreStart.
  • ConcurrentAsyncCallback<T>.InvokeWithPromise captures Activity.Current?.Context and wraps the handler with an akka.stream.ingress {StageName} Activity on the interpreter thread.

ConcurrentAsyncCallback is the shared primitive used by Source.Queue, custom StageActor-based sources, and any source going through GraphStageLogicWithCallbackWrapper, so one fix lights up all of them without per-stage code.

3. Akka.Streams: fan-in stage span linking with ActivityLinks

  • Connection.SlotLinks (ActivityContext[]) carries additional parent contexts alongside the primary SlotContext, plus PendingPushPrimaryContext / PendingPushLinks as a per-push override mechanism.
  • GraphStageLogic exposes two internal APIs for fan-in stages:
    • CurrentInletTraceContext(inlet) — read the upstream element's trace context before Grab clears it
    • SetFanInTraceContext(outlet, primary, links) — stage the next Push as a fan-in flush with a specific primary parent + link set
  • GraphStageLogic.Push consumes the pending override when present, falling back to Activity.Current capture otherwise (preserves existing behavior for non-fan-in stages).
  • GraphInterpreter.ProcessPush starts the downstream stage Activity with both the primary context as parent and the accumulated SlotLinks as ActivityLinks, tagging the span with stream.fan_in.links for visibility.
  • Batch.Logic (covers both Batch and BatchWeighted) accumulates each inbound element's trace context in OnPush and emits the aggregate on Flush. Pending-element state carries its own context across the flush/reseed boundary so that the element triggering a boundary flush also contributes to the next aggregate's links correctly.

4. Akka.Streams: fan-in/fan-out stage trace propagation + spec suite

Extends fan-in linking to the other built-in stages and adds fan-out + regression coverage:

  • GroupedWeightedWithin.Logic — captures each inbound element's context alongside the buffer, plus the pending-element context for boundary overflow. On EmitGroup emits the first collected context as the primary parent and the rest as ActivityLinks.
  • Merge.Logic — captures the inlet's SlotContext on both the fast path (outlet immediately available) and the slow path (element enqueued for later DequeueAndDispatch), staging it as the downstream Push's primary parent so trace continuity survives the OnPull boundary where Activity.Current would otherwise be null.
  • MergePreferred.Logic — same, for both the preferred and secondary Emit paths.
  • Concat.Logic — same, for every secondary inlet's OnPush.

Merge/MergePreferred/Concat are 1-to-1 pass-throughs, so they never actually attach multiple ActivityLinks — the fan-in API is used only to override the primary parent from Activity.Current (which on OnPull is null) to the captured upstream inlet context. Batch/BatchWeighted and GroupedWeightedWithin are true fan-ins that attach N-1 links.

Fan-out stages (Broadcast, Balance) require no changes. Each downstream branch's ProcessPush already captures Activity.Current (which is the fan-out stage's own span, itself parented to the upstream slot context), so every branch inherits the producer's trace id via the existing per-element carry.

Tests

src/core/Akka.Streams.Tests/Implementation/ — 14 scenarios across 4 focused spec files plus a shared helper:

StreamsDiagnosticsSpec (4 tests) — linear chain + basic ingress

  1. Sync Source.Queue → Select → Sink.Seq — stage spans emitted with correct parent chain.
  2. Source.Queue → SelectAsync → Sink.Ignore with a user span created inside the async lambda — verifies async-boundary continuity.
  3. Two offers from two distinct traced scopes through the same Source.Queue — multi-producer interleaving lands each element's downstream spans in its own trace, never mixed.
  4. Element offered without Activity.Current set — zero stream spans emitted.

StreamsFanInSpec (5 tests) — true fan-in and pass-through fan-ins

  • BatchWeighted with 3 concurrent traced producers — asserts primary parent = first input, 2 ActivityLinks to the other two inputs.
  • BatchWeighted with 7 traced inputs — asserts link count is exactly N - 1.
  • GroupedWithin count-driven flush — asserts primary + 3 links on the emitted group.
  • Merge — two independent queues each offer one traced element; both elements reach the downstream Select stage with their originating producer's trace id intact.
  • Concat — sequential drain of two sources; each element carries its own source's trace forward.

StreamsFanOutSpec (2 tests) — Broadcast/Balance validation-only

  • Broadcast(2) — one traced offer fans out to two Select stages, both inherit the producer's trace id.
  • Balance(2) — two traced offers distributed round-robin, both branches run under the producer's trace.

StreamsRegressionSpec (3 tests) — cardinality guards

  • Background Source.Tick → Select → SelectAsync → Sink.Ignore with no traced producer — asserts zero "Akka.Streams" spans emitted. Guards against unbounded span accumulation on long-lived background streams.
  • Mixed traced + untraced via Merge — traced offer produces spans, tick ticks stay invisible, every emitted span belongs to the producer's trace.
  • GraphDsl-composed sub-graph with internal Select → Select chain — end-to-end trace id propagation through the composition boundary.

StreamsActivityCollector (helper)

Shared internal helper extracted so each spec file can subscribe to the "Akka.Streams" ActivitySource the same way without duplicating 15 lines of ActivityListener boilerplate.

Verification

  • 1,815 / 1,815 Akka.Streams.Tests (net10.0) passing, 34 pre-existing skips. Zero behavioral regressions from the new primitives.
  • 14 / 14 new trace specs passing.
  • End-to-end trace continuity visually confirmed in a real OTLP → dashboard pipeline against a live Source.Queue → Select → BatchWeighted → SelectAsync → HttpClient pipeline with concurrent producers: the flushed batch's downstream stage span carries the expected Links: N > 0, and each producer's individual trace backlinks into the shared batch span.

Performance

  • Zero overhead when the Akka.Streams source has no listener (StartActivity returns null) or when Activity.Current is null at the producer (background streams, internal sources like Source.Tick).
  • One closure allocation per ConcurrentAsyncCallback.Invoke only when a producer trace context exists.
  • One stage Activity allocation per stage transition per element when tracing is active.
  • Fan-in stages allocate one List<ActivityContext> per logic instance (lifetime-of-stage, not per-element) plus one ActivityLink[] per flush when there are 2+ contributing traces.

Happy to add BenchmarkDotNet coverage if reviewers want to quantify any of those.

Scope

Covered in this PR:

  • GetAsyncCallback-based sources: Source.Queue, custom StageActor-based sources, anything going through GraphStageLogicWithCallbackWrapper
  • Fan-in: Batch, BatchWeighted, GroupedWeightedWithin, Merge, MergePreferred, Concat
  • Fan-out: Broadcast, Balance (validation-only, no stage-specific changes)

Not covered (follow-up work):

  • Source.ActorRef / Source.ActorPublisher — reactive-streams OnNext path, would need a separate ingress capture site
  • MergePrioritized, Zip family (ZipWith, ZipLatest, ZipN, Interleave), OrElse — additional fan-in shapes not yet wired
  • Attribute enrichment from Attributes.Name — stage spans currently use type-reflected names (e.g. Select) rather than user-supplied .Named("...") labels
  • Dedicated akka.stream.batch.flush span type — current implementation inlines fan-in links onto the existing downstream stage span, which is simpler but less explicit in trace viewers

Status

Draft. Opened to invite design feedback before any merge consideration.

Adds end-to-end OpenTelemetry trace-context continuity through stream
graphs. When an external producer offers an element to a Source.Queue
(or any other stream source) while a parent Activity is alive on the
producer thread, the framework now propagates that context to downstream
stage handlers across dispatcher boundaries where AsyncLocal would
otherwise be lost.

Design mirrors PR akkadotnet#7995 (LogEvent.ActivityContext): capture on the
producer side before the boundary hop, carry it in a framework-owned
field, restore it on the consumer side before invoking the user's stage
handler.

Changes:
- New Akka.Streams.Implementation.StreamsDiagnostics with a framework-
  owned ActivitySource("Akka.Streams"). Register via .AddSource(...) on
  your OTel TracerProvider to opt in.
- Connection.SlotContext parallels Connection.Slot and carries the
  ActivityContext of the element currently in flight on that connection.
- GraphStageLogic.Push captures Activity.Current?.Context into
  SlotContext when writing the slot; Grab clears it alongside the slot.
- GraphInterpreter.ProcessPush starts an "akka.stream.stage <name>"
  Activity with the slot's captured context as parent, sets
  Activity.Current, calls the handler, and disposes in a finally.
  Mirrors the existing previousInterpreter try/finally restore pattern.
- QueueSource Offer<T> carries IngressContext captured from
  Activity.Current at OfferAsync call time; the source's async callback
  handler starts an "akka.stream.offer Source.Queue" ingress span with
  that context as parent before calling Push (which propagates it
  forward via SlotContext).
- Zero-allocation fast path: when the "Akka.Streams" source has no
  listeners or Activity.Current was null at the producer, StartActivity
  returns null and no spans / extra work are performed.

Initial scope covers Source.Queue ingress and sync + async downstream
stages (Select, SelectAsync, Sink.Seq). A follow-up commit generalizes
the ingress capture to all GetAsyncCallback-based sources via the
shared GraphStageLogicWithCallbackWrapper / ConcurrentAsyncCallback
primitive.

Tests: src/core/Akka.Streams.Tests/Implementation/StreamsDiagnosticsSpec.cs
covers three scenarios:
1. sync Select pipeline — stage spans emitted with correct parent chain
2. SelectAsync with user span inside the async lambda — end-to-end
   continuity validated across the async boundary
3. no producer context — zero spans emitted, so background streams
   (Source.Tick, internal sources, etc.) stay invisible to tracing
…imitive

Moves the Source.Queue-specific ingress span from QueueSource into the
shared GraphStageLogicWithCallbackWrapper / ConcurrentAsyncCallback
primitive, so every GetAsyncCallback-based source (Source.Queue,
custom StageActor-based sources, anything using the wrapper) gets
producer-thread trace context capture for free without per-stage code.

Key change: capture Activity.Current?.Context on the PRODUCER thread at
InvokeCallbacks (not inside the interpreter-thread callback) so queued
args — i.e. ones pushed before InitCallback runs — retain the producer's
context through the pending queue. This was the missing piece previously:
only the already-initialized fast path captured context, and any call
that raced PreStart landed on the interpreter thread with
Activity.Current = null.

Changes:
- GraphStageLogicWithCallbackWrapper.NotInitialized.Args now holds
  (arg, ActivityContext?) tuples captured at InvokeCallbacks
  producer-side.
- InitCallback drains queued args with the captured context restored
  via an "akka.stream.ingress.queued" Activity.
- ConcurrentAsyncCallback.InvokeWithPromise captures Activity.Current
  ?.Context at Invoke time and wraps the handler with an
  "akka.stream.ingress {stage}" Activity, so downstream Push captures
  it into Connection.SlotContext and ProcessPush creates the first
  stage span with it as parent.
- Source.Queue-specific IngressContext code in Sources.cs is reverted
  (redundant with the generalized path).

Tests (StreamsDiagnosticsSpec.cs):
- ProducerActivityContext_should_propagate_to_downstream_stage_spans:
  still passes, now exercising the generalized path.
- User_span_inside_SelectAsync_lambda_should_parent_to_stage_span:
  still passes, proving async-boundary continuity through the new path.
- Multiple_offers_from_different_traced_scopes_should_preserve_distinct_traces:
  NEW — multi-producer interleaving scenario; verifies that two offers
  from two distinct traced scopes through the same Source.Queue land
  in their own traces, never mixed.
- No_producer_context_should_produce_no_stream_spans:
  still passes, proving the zero-overhead no-trace path.

Source.ActorRef is structurally different (uses ActorPublisher /
reactive streams, not GetAsyncCallback) and is not covered here. A
separate change would be needed to extend the same producer-side
capture to that ingress family.
When a stage like Batch or BatchWeighted merges N input elements into one
output, downstream tracing needs to preserve the trace identity of every
contributing input — not just the one that happened to trigger the flush.
OpenTelemetry's canonical answer for fan-in is span links: pick one primary
parent, attach the rest as ActivityLinks.

Wiring:

- Connection gains SlotLinks (ActivityContext[]) alongside SlotContext, plus
  PendingPushPrimaryContext / PendingPushLinks as a per-push override hook.
- GraphStageLogic.Push consumes the pending override when present and falls
  back to Activity.Current capture otherwise, preserving existing behavior
  for non-fan-in stages.
- GraphInterpreter.ProcessPush starts the downstream stage Activity with
  both the primary context as parent and the accumulated SlotLinks as
  ActivityLinks, tagging the span with the link count for visibility.
- GraphStageLogic exposes two internal APIs for fan-in stages:
  CurrentInletTraceContext (read upstream ctx before Grab clears it) and
  SetFanInTraceContext (stage the next Push as a fan-in flush).
- Grab clears SlotLinks alongside SlotContext.

Batch.Logic uses the new API: OnPush captures each inbound element's trace
context, accumulating across the aggregate. Flush emits the first input's
context as primary parent and the rest as links, then clears. Pending
element state carries its own context across the flush/reseed boundary so
that the element triggering a boundary flush also contributes to the next
aggregate's links correctly. BatchWeighted uses the same Logic class and
inherits the behavior with no further changes.

StreamsDiagnosticsSpec adds BatchWeighted_should_link_all_input_traces_via_
ActivityLinks_on_flushed_stage_span: three producers in distinct traced
scopes offer into a Source.Queue upstream of BatchWeighted, with a gated
SelectAsync(1) downstream holding Batch's outlet busy while the elements
accumulate. The test asserts that the downstream stage span has the first
producer's TraceId as its primary parent and forward links to the other
two producer traces. All 19 existing StreamsDiagnosticsSpec and Batch
tests continue to pass.

Still to do (tracked separately): GroupedWithin, Merge, MergePreferred,
Concat wiring; fan-out validation tests for Broadcast and Balance; and
the broader multi-topology regression suite.
Extends the fan-in linking mechanism introduced for Batch/BatchWeighted to
every built-in fan-in stage, plus validation coverage for fan-out stages
and a regression guard against tracing background (untraced) streams.

Fan-in wiring:

- GroupedWeightedWithin.Logic (covers GroupedWithin/GroupedWeightedWithin)
  accumulates each inbound element's ActivityContext alongside _buffer,
  plus the pending-element context for boundary overflow. On EmitGroup it
  emits the first collected context as the primary parent and the rest as
  ActivityLinks via SetFanInTraceContext.
- Merge.Logic captures the inlet's SlotContext on both the fast path
  (outlet immediately available) and the slow path (element enqueued for
  later DequeueAndDispatch), staging it as the downstream Push's primary
  parent so trace continuity survives the OnPull boundary where
  Activity.Current would otherwise be null.
- MergePreferred.Logic does the same for both the preferred and secondary
  Emit paths.
- Concat.Logic does the same for every secondary inlet's OnPush.

Merge/MergePreferred/Concat are 1-to-1 pass-throughs, so they never
actually attach multiple ActivityLinks — the "fan-in" API is used only to
override the primary parent from Activity.Current (which on OnPull is
null) to the captured upstream inlet context. Batch/BatchWeighted and
GroupedWeightedWithin are true fan-ins that do attach N-1 links.

Fan-out stages (Broadcast, Balance) require no changes. Each downstream
branch's ProcessPush already captures Activity.Current (which is the
fan-out stage's own span, itself parented to the upstream slot context),
so every branch inherits the producer's trace id via the existing Phase
1/2 machinery.

Test coverage reorganized into four focused spec files under
src/core/Akka.Streams.Tests/Implementation/:

- StreamsDiagnosticsSpec (unchanged existing 4): linear chain + basic
  Source.Queue ingress + multi-producer-scope separation
- StreamsFanInSpec (5 new): BatchWeighted first-wins + link-count,
  GroupedWithin fan-in, Merge and Concat pass-through trace preservation
- StreamsFanOutSpec (2 new): Broadcast and Balance trace-id propagation
  to every branch
- StreamsRegressionSpec (3 new): background Source.Tick silence
  (regression guard against interpreter-level span explosion), mixed
  traced/untraced Merge, GraphDsl-composed sub-graph end-to-end
- StreamsActivityCollector: shared internal helper extracted from
  StreamsDiagnosticsSpec so every spec file can subscribe to the
  "Akka.Streams" ActivitySource the same way

Verification: 14/14 new trace specs pass. 618/618 Dsl.Flow* tests pass
(all Batch/Merge/Concat/MergePreferred/GroupedWithin behavior). 288/288
Implementation + Fusing tests pass. Zero behavioral regressions from
the new instrumentation primitives.
Pins Akka.Streams.csproj to Version=1.5.99.3 while keeping
AssemblyVersion=1.5.60.0 for binary compat with the stable Akka 1.5.60
package consumed by downstream consumers. The root VersionPrefix bumps
to 1.5.60 so that when Akka.Streams.csproj is packed via ProjectReference,
the Akka dependency in the resulting nuspec resolves to 1.5.60 (which
exists on nuget.org) rather than a phantom 1.5.99.x that does not.

The 1.5.99.3 nupkg carries the fan-in linking machinery plus the new
StreamsFanInSpec / StreamsFanOutSpec / StreamsRegressionSpec suite and
is published to the testlab feed so downstream consumers can
validate end-to-end trace continuity through a real pipeline.
@Aaronontheweb Aaronontheweb force-pushed the phobos-streams-trace-context branch from 363fa56 to 0cfe3c4 Compare April 14, 2026 18:06
@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines:
Successfully started running 1 pipeline(s).

…ion artifacts

Adds a focused xUnit spec that materializes six representative stream
topologies, captures the real Activity spans the interpreter emits, and
renders each scenario as an ASCII span-tree markdown file under
src/core/Akka.Streams.Tests/Implementation/trace-samples/. The renderings
are documentation artifacts answering two questions a maintainer would
otherwise have to run the framework to answer:

- Which stages appear as spans under which topology?
- How do fan-in ActivityLinks actually look in the emitted output?

Scenarios covered:

- linear-chain.md            Source.Queue -> Select -> Sink.Seq
- selectasync-user-span.md   SelectAsync with a user span inside the lambda
- batchweighted-fan-in.md    3x concurrent producers merged into one
                             BatchWeighted aggregate; shows the flushed
                             downstream span carrying 2 ActivityLinks
- merge-two-sources.md       Merge pass-through with 2 independent
                             producer traces both reaching the downstream
                             Select with their own TraceId intact
- broadcast-two-branches.md  Broadcast(2) fan-out, both branches inherit
                             the producer TraceId
- untraced-tick-zero-spans.md  Background Source.Tick regression guard —
                             zero spans emitted, proving the cardinality
                             guarantee

All six span trees are built from real Activity objects produced by a
live GraphInterpreter in each test method, not from hand-drawn data.

Rendering to disk is opt-in via the AKKA_STREAMS_RENDER_TRACE_SAMPLES=1
environment variable so day-to-day test runs do not churn the committed
markdown files with fresh TraceId / SpanId hex strings. Default test
runs still execute each scenario end-to-end and dump the tree to the
xUnit Output writer — they just don't overwrite the committed samples.

Includes a trace-samples/README.md with a scenarios table, the
regeneration command, and an explanation of how the renderings are
produced so future readers can audit or extend the set.
@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines:
Successfully started running 1 pipeline(s).

…stream trace tests

Two CI fixes for PR akkadotnet#8160, both test-side only. No library code changes.

## Akka.API.Tests CoreAPISpec.ApproveStreams (all 3 platforms)

The public-API approval baselines at
src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.{DotNet,Net}.verified.txt
did not yet include the new public surface introduced by this branch:

- Akka.Streams.Implementation.StreamsDiagnostics (InternalApi-tagged class)
  with ActivitySourceName, ActivitySource, and GetStageName members
- Connection.SlotContext, SlotLinks, PendingPushPrimaryContext, PendingPushLinks
  (new public properties on the nested Connection type in GraphInterpreter)
- GraphStageLogic.CurrentInletTraceContext<T>(Inlet<T>)
- GraphStageLogic.SetFanInTraceContext<T>(Outlet<T>, ActivityContext, IReadOnlyList<ActivityContext>)

Regenerated both baseline files (byte-identical except for the TargetFramework
assembly attribute line — v6.0 vs netstandard2.0). Preserves CRLF line endings
and UTF-8 BOM to match the repo's existing convention for verified files.

## Streams trace specs — netfx_tests_windows (net48 only)

Nine Streams*Spec tests assert against ActivityTraceId equality, but on
.NET Framework 4.8 every test produced all-zero TraceIds
(00000000000000000000000000000000), causing every cross-span comparison
to fail despite both sides being "equal".

Root cause: Activity.DefaultIdFormat on .NET Framework 4.8 defaults to
ActivityIdFormat.Hierarchical, not W3C. When an ActivitySource starts an
activity under Hierarchical format, its Context struct's TraceId / SpanId
slots are unpopulated (default). On modern runtimes (.NET 6+) the default
is W3C and the context slots are populated correctly — which is why the
net10 runs passed locally and in the net_tests_{linux,windows} CI legs
but netfx_tests_windows failed.

Fix (test-side only): set

    Activity.DefaultIdFormat = ActivityIdFormat.W3C;
    Activity.ForceDefaultIdFormat = true;

in the static constructors of StreamsActivityCollector and
ProducerActivityScope — the two helper types every Streams*Spec test
touches at the top of each test body. Setting it in a static cctor
guarantees the format is applied before any ActivitySource in the
test process creates its first activity.

Important: the Akka.Streams library itself is NOT affected. This is a
test-process environmental setup that makes the netfx test runner
behave the same as the modern runtimes. The library code works
correctly under either ID format. No TFM change, no library source
change.
@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines:
Successfully started running 1 pipeline(s).

@Aaronontheweb Aaronontheweb added akka-streams logging DX Developer experience issues - papercuts, footguns, and other non-bug problems. labels Apr 15, 2026
@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines:
Successfully started running 1 pipeline(s).

@Aaronontheweb
Copy link
Copy Markdown
Member Author

Aaronontheweb commented Apr 15, 2026

This is essentially our answer to petabridge/phobos-issues#42, and the results are pretty solid with our latest Phobos prototype that is built on this branch.

Before this PR, a SqlClient.ExecuteAsync or HttpClient.SendAsync inside a SelectAsync body becomes an orphaned root span with a random TraceId — AsyncLocal<Activity> doesn't flow across the stream actor's dispatcher boundary, so the producer-side context is just gone by the time the handler runs. And when multiple concurrent requests get merged by BatchWeighted, everyone except whichever request happened to trigger the flush loses their trace identity entirely.

After this PR, both of those work. Every stage in the graph emits a span parented to the right place, and fan-in merges attach ActivityLink references from the flushed batch back to each contributing request's trace — so any viewer that speaks OTLP can jump across sibling traces that share downstream work.

Here's the pipeline I've been exercising, side-by-side with what Jaeger renders:

var orderQueue =
  Source.Queue<Order>(100, OverflowStrategy.DropNew)
    .Select(Normalize)
    .BatchWeighted(
        max: 5L,
        costFunction: _ => 1L,
        seed: o => new List<Order> { o },
        aggregate: (acc, o) =>
        {
            acc.Add(o);
            return acc;
        })
    .SelectAsync(1, async batch =>
    {
        using var resp = await httpClient
            .PostAsync("anything?batch=1", Serialize(batch));
        return batch;
    })
    .ToMaterialized(
        Sink.Ignore<List<Order>>(),
        Keep.Left)
    .Run(_materializer);

Twelve producers fire at the queue concurrently; BatchWeighted merges five of them into one outbound HttpClient POST. Opting the new spans into your OpenTelemetry pipeline is one line:

tracing.AddSource("Akka.Streams");

Same shape as AddSource("Microsoft.EntityFrameworkCore"). Zero cost if nobody's listening.

07-jaeger-batched-references-hidpi

The stuff I want to call out here:

  • The entire span tree on the left is one trace. Actor OnReceiveQueueSource ingress → SelectBatchSelectAsync → outbound HttpClient POST. Every stage becomes its own span, parented correctly.
  • The tags row on the right shows otel.library.name = Akka.Streams and stream.fan_in.links = 4. That second tag is the framework advertising from the emit side — "this span has four cross-trace links on it."
  • The References (5) section lists one CHILD_OF → the local Batch span (normal parent/child inside this trace), plus four FOLLOWS_FROM entries → < span in another trace >. Jaeger renders OpenTelemetry ActivityLink as FOLLOWS_FROM. That's five concurrent requests merged into one outbound HTTP call, every one of them keeping its own trace identity and backlinking to the shared flush.

A few more views of the same batch for reference:

Full span tree without the detail panel 02-jaeger-batched-trace-full-tree
SelectAsync detail panel with References collapsed 03-jaeger-selectasync-span-detail
One of the contributing requests on its own trace 06-jaeger-single-trace-comparison

A request whose element got batched into the flush above. Its own trace stops at QueueSource — the Select / Batch / SelectAsync / IgnoreSink downstream work lives on the merged-batch trace, reachable via the FOLLOWS_FROM link. Click it in a viewer that supports cross-trace navigation and you land on the batched span.

@Aaronontheweb
Copy link
Copy Markdown
Member Author

One thing I should flag since the screenshot above shows an actor in the root position of the trace: none of what this PR adds actually requires an actor at the top, and none of it requires Phobos. The framework change just reads Activity.Current?.Context at whatever thread is calling OfferAsync (or any GetAsyncCallback-based ingress path) and carries it through. It doesn't care where that Activity came from.

Concretely, all of these Just Work once you .AddSource("Akka.Streams") on your OpenTelemetry TracerProvider:

  • ASP.NET Core controller → direct OfferAsync. OpenTelemetry.Instrumentation.AspNetCore sets Activity.Current for the duration of the request handler, so a controller that materializes (or holds a reference to) a Source.Queue and offers into it directly gets the full chain — incoming HTTP span → ingress → stage spans → whatever the SelectAsync body does. No actor, no Phobos.
  • BackgroundService doing await httpClient.GetAsync(...) then await queue.OfferAsync(result). The HttpClient instrumentation sets Activity.Current to the outbound call's span for the duration of the continuation, so the stream work lands underneath the HTTP client's trace.
  • User code that wraps work in its own ActivitySource.StartActivity(...) scope. Any span the user owns becomes the parent of whatever the stream does from that scope. Completely untethered from anything Akka.NET-specific.

The reason the sample above happens to show an actor at the top is just that we were stress-testing the actor-to-stream path specifically — that's the case where AsyncLocal<Activity> breaks across the mailbox boundary and you need some mechanism to re-establish the context on the receiving side. Phobos is how we do that in our prototype, but a handwritten Tell-capturing wrapper or a pattern where the stream is materialized outside the actor would work identically — this PR doesn't care.

TL;DR: this is a framework-level capability. Opt in with one line of OTel registration, get end-to-end trace continuity through any graph shape, from any producer.

@Aaronontheweb
Copy link
Copy Markdown
Member Author

Known limitation: trace context does not cross a StreamRef hop.

StreamRefs serialize elements across the network via StreamRefsProtocol.proto, which doesn't have a field for ActivityContext. So when an element travels from Node A's SourceRef/SinkRef to Node B, the context that was alive on Node A during Push is dropped at serialization time. On Node B, the interpreter thread runs with Activity.Current = null, the new context-capture sees null, and downstream stages on Node B emit no spans.

Not a regression — before this PR there was no context propagation at all. After this PR, local-node traces work end-to-end; cross-node traces that flow through a StreamRef still break the chain at the wire boundary.

The fix is pretty small and follows the same pattern as #7995 (which added LogEvent.ActivityContext so log events can carry trace context across the mailbox boundary): add an optional ActivityContext field to the OnNext payload in StreamRefsProtocol.proto, capture on the sending side in SourceRefImpl, restore on the receiving side before the local Push call. Wire-compat safe because the proto field is optional, and entirely additive — no existing StreamRef behavior changes.

Out of scope for this PR. Opening a tracking issue to come back to it after this lands.

@Aaronontheweb
Copy link
Copy Markdown
Member Author

Known limitation: trace context does not cross a StreamRef hop.

StreamRefs serialize elements across the network via StreamRefsProtocol.proto, which doesn't have a field for ActivityContext. So when an element travels from Node A's SourceRef/SinkRef to Node B, the context that was alive on Node A during Push is dropped at serialization time. On Node B, the interpreter thread runs with Activity.Current = null, the new context-capture sees null, and downstream stages on Node B emit no spans.

Not a regression — before this PR there was no context propagation at all. After this PR, local-node traces work end-to-end; cross-node traces that flow through a StreamRef still break the chain at the wire boundary.

The fix is pretty small and follows the same pattern as #7995 (which added LogEvent.ActivityContext so log events can carry trace context across the mailbox boundary): add an optional ActivityContext field to the OnNext payload in StreamRefsProtocol.proto, capture on the sending side in SourceRefImpl, restore on the receiving side before the local Push call. Wire-compat safe because the proto field is optional, and entirely additive — no existing StreamRef behavior changes.

Out of scope for this PR. Opening a tracking issue to come back to it after this lands.

LLM generated comment but it's correct - not going to bother getting this to work with StreamRefs because IMHO StreamRef serialization sucks today and will improve a lot once we add code-generated serialization in v1.6.

The fan-in trace-linking plumbing introduced earlier in this branch
exposed six members to the public API surface that don't need to be
public. The only callers are framework-internal stage implementations
(Batch.Logic, GroupedWeightedWithin.Logic, Merge.Logic,
MergePreferred.Logic, Concat.Logic), all of which live inside the
Akka.Streams assembly itself. Downgrading visibility keeps the
ergonomics the same for in-assembly callers (no code change on their
side, internal accessibility from within the defining assembly) while
shrinking the public API surface and reserving flexibility to evolve
the fan-in API shape later without a breaking change.

- GraphStageLogic.CurrentInletTraceContext<T>       protected -> internal
- GraphStageLogic.SetFanInTraceContext<T>           protected -> internal
- Connection.SlotContext                               public -> internal
- Connection.SlotLinks                                 public -> internal
- Connection.PendingPushPrimaryContext                 public -> internal
- Connection.PendingPushLinks                          public -> internal

StreamsDiagnostics (the framework-owned ActivitySource) stays public
and [InternalApi]-tagged — users opting their TracerProvider into the
new stream spans need the const string "Akka.Streams" to call
.AddSource(...) on their OTel pipeline.

Regenerated CoreAPISpec.ApproveStreams baselines (both DotNet.verified
and Net.verified variants) to drop the six lines that no longer appear
in the public surface. Full Streams trace spec suite (StreamsDiagnostics-
Spec, StreamsFanInSpec, StreamsFanOutSpec, StreamsRegressionSpec,
StreamsTraceRenderingSpec — 20 tests) still passes on net10.0 with no
changes to the calling code: InternalsVisibleTo("Akka.Streams.Tests")
lets the specs reach the now-internal helpers directly.
@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines:
Successfully started running 1 pipeline(s).

…tring consts, helper dedup

Post-review cleanup of the trace-context propagation work. Three classes of change:

1. HasListeners() hot-path guards

GraphStageLogic.Push, GraphInterpreter.ProcessPush, and
ConcurrentAsyncCallback.InvokeWithPromise now short-circuit the
Activity.Current read and the StartActivity / GetStageName work when
nothing is listening to the "Akka.Streams" ActivitySource. Previously
Push would always read Activity.Current?.Context (one nullable
property access + field writes per element) and ProcessPush would
always call StreamsDiagnostics.GetStageName(stage) (reflection on
DeclaringType + IndexOf + Substring) before discovering that
StartActivity returns null. With the guard, the per-element cost on
non-traced streams drops from ~40-100 CPU cycles to essentially zero.

GraphStageLogicWithCallbackWrapper.InvokeCallbacks also moves the
Activity.Current capture inside the NotInitialized branch so the
Initialized and Stopped fast paths don't touch Activity.Current at all.

2. Stringly-typed constants

Span operation names ("akka.stream.stage", "akka.stream.ingress",
"akka.stream.ingress.queued") and tag keys ("stream.stage.type",
"stream.fan_in.links") are now internal const fields on
StreamsDiagnostics. Call sites in GraphInterpreter.cs and GraphStage.cs
reference them instead of raw strings.

3. Deduplication of pass-through fan-in pattern

Merge.Logic (fast path + slow-path DequeueAndDispatch) and Concat.Logic
used to repeat a four-line pattern inline:
    var ctx = CurrentInletTraceContext(inlet);
    var element = Grab(inlet);
    if (ctx.HasValue) SetFanInTraceContext(outlet, ctx.Value, null);
    Push(outlet, element);

Factored into a single internal helper GrabAndPushFanIn<TIn, TOut>
on GraphStageLogic, with the TIn : TOut constraint so it works for
Merge/Concat's covariant element types. MergePreferred stays inline
because it uses the Emit deferred-push path instead of direct Push,
and wrapping Emit cleanly would need its own helper for a modest win.

Also: static-constructor W3C activity format setup is now a single
StreamsActivityTestSetup.EnsureW3CActivityFormat() call, deduplicated
between StreamsActivityCollector and ProducerActivityScope.

No functional changes. All 73 affected tests (trace specs + Flow Merge
+ Flow Concat + Flow Batch + Flow GroupedWithin + MergePreferred behavior
specs) and the CoreAPISpec.ApproveStreams approval test pass on net10.0
after the refactor.
@azure-pipelines
Copy link
Copy Markdown

Azure Pipelines:
Successfully started running 1 pipeline(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

akka-streams DX Developer experience issues - papercuts, footguns, and other non-bug problems. logging

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant