@@ -124,71 +124,17 @@ void WorkerTracer::addLog(const tracing::InvocationSpanContext& context,
124124}
125125
126126void WorkerTracer::addSpan (tracing::CompleteSpan&& span) {
127+ // The span information is not transmitted via RPC at this point, we can decompose the span into
128+ // spanOpen/spanEnd.
127129 addSpanOpen (span.spanId , span.parentSpanId , span.operationName , span.startTime );
128- addSpanEnd (kj::mv (span));
129- #if 0
130- // This is where we'll actually encode the span.
131- if (pipelineLogLevel == PipelineLogLevel::NONE) {
132- return;
133- }
134-
135- // Note: spans are not available in the buffered tail worker, so we don't need an exceededSpanLimit
136- // variable for it and it can't cause truncation.
137- auto& tailStreamWriter = KJ_UNWRAP_OR_RETURN(maybeTailStreamWriter);
138-
139- adjustSpanTime(span);
140-
141- size_t spanTagsSize = 0;
142- size_t spanNameSize = span.operationName.size();
143- for (const Span::TagMap::Entry& tag: span.tags) {
144- spanTagsSize += tag.key.size();
145- KJ_SWITCH_ONEOF(tag.value) {
146- KJ_CASE_ONEOF(str, kj::ConstString) {
147- spanTagsSize += str.size();
148- }
149- KJ_CASE_ONEOF(val, bool) {
150- spanTagsSize++;
151- }
152- // int64_t and double
153- KJ_CASE_ONEOF_DEFAULT {
154- spanTagsSize += sizeof(int64_t);
155- }
156- }
157- }
158-
159- // Compose span events – attributes and spanClose are transmitted together for now.
160- auto& topLevelContext = KJ_ASSERT_NONNULL(topLevelInvocationSpanContext);
161- // Compose span events. For SpanOpen, an all-zero spanId is interpreted as having no spans above
162- // this one, thus we use the Onset spanId instead (taken from topLevelContext). We go to great
163- // lengths to rule out getting an all-zero spanId by chance (see SpanId::fromEntropy()), so this
164- // should be safe.
165- tracing::SpanId parentSpanId = span.parentSpanId;
166- if (parentSpanId == tracing::SpanId::nullId) {
167- parentSpanId = topLevelContext.getSpanId();
168- }
169-
170- auto spanOpenContext = tracing::InvocationSpanContext(
171- topLevelContext.getTraceId(), topLevelContext.getInvocationId(), parentSpanId);
172- auto spanComponentContext = tracing::InvocationSpanContext(
173- topLevelContext.getTraceId(), topLevelContext.getInvocationId(), span.spanId);
174- tailStreamWriter->report(spanOpenContext,
175- tracing::SpanOpen(span.spanId, span.operationName.clone()), span.startTime, spanNameSize);
176- // If a span manages to exceed the size limit, truncate it by not providing span attributes.
177- if (span.tags.size() && spanTagsSize <= MAX_TRACE_BYTES) {
178- tracing::CustomInfo attr = KJ_MAP(tag, span.tags) {
179- return tracing::Attribute(kj::mv(tag.key), kj::mv(tag.value));
180- };
181- tailStreamWriter->report(spanComponentContext, kj::mv(attr), span.startTime, spanTagsSize);
182- }
183- tailStreamWriter->report(spanComponentContext, tracing::SpanClose(), span.endTime, 0);
184- #endif
130+ tracing::SpanEndData spanEnd (kj::mv (span));
131+ addSpanEnd (kj::mv (spanEnd));
185132}
186133
187134void WorkerTracer::addSpanOpen (tracing::SpanId spanId,
188135 tracing::SpanId parentSpanId,
189136 kj::ConstString& operationName,
190137 kj::Date startTime) {
191- // This is where we'll actually encode the span.
192138 if (pipelineLogLevel == PipelineLogLevel::NONE) {
193139 return ;
194140 }
@@ -208,8 +154,7 @@ void WorkerTracer::addSpanOpen(tracing::SpanId spanId,
208154 spanOpenContext, tracing::SpanOpen (spanId, operationName.clone ()), startTime, spanNameSize);
209155}
210156
211- void WorkerTracer::addSpanEnd (tracing::CompleteSpan&& span) {
212- // This is where we'll actually encode the span.
157+ void WorkerTracer::addSpanEnd (tracing::SpanEndData&& span) {
213158 if (pipelineLogLevel == PipelineLogLevel::NONE) {
214159 return ;
215160 }
@@ -237,7 +182,8 @@ void WorkerTracer::addSpanEnd(tracing::CompleteSpan&& span) {
237182 }
238183 }
239184
240- // Compose span events – attributes and spanClose are transmitted together for now.
185+ // Compose Attributes and SpanClose, which are available at span completion time and transmitted
186+ // together.
241187 auto & topLevelContext = KJ_ASSERT_NONNULL (topLevelInvocationSpanContext);
242188 auto spanComponentContext = tracing::InvocationSpanContext (
243189 topLevelContext.getTraceId (), topLevelContext.getInvocationId (), span.spanId );
@@ -513,6 +459,56 @@ void BaseTracer::adjustSpanTime(tracing::CompleteSpan& span) {
513459 }
514460}
515461
462+ void BaseTracer::adjustSpanTime (tracing::SpanEndData& span) {
463+ // To report I/O time, we need the IOContext to still be alive.
464+ // weakIoContext is only none if we are tracing via RPC (in this case span times have already been
465+ // adjusted) or if we failed to transmit an Onset event (in that case we'll get an error based on
466+ // missing topLevelInvocationSpanContext right after).
467+ if (weakIoContext != kj::none) {
468+ auto & weakIoCtx = KJ_ASSERT_NONNULL (weakIoContext);
469+ weakIoCtx->runIfAlive ([this , &span](IoContext& context) {
470+ if (context.hasCurrentIncomingRequest ()) {
471+ span.endTime = context.now ();
472+ } else {
473+ // We have an IOContext, but there's no current IncomingRequest. Always log a warning here,
474+ // this should not be happening. Still report completeTime as a useful timestamp if
475+ // available.
476+ bool hasCompleteTime = false ;
477+ if (completeTime != kj::UNIX_EPOCH) {
478+ span.endTime = completeTime;
479+ hasCompleteTime = true ;
480+ } else {
481+ span.endTime = span.startTime ;
482+ }
483+ if (isPredictableModeForTest ()) {
484+ KJ_FAIL_ASSERT (" reported span without current request" , hasCompleteTime);
485+ } else {
486+ LOG_WARNING_PERIODICALLY (" reported span without current request" );
487+ }
488+ }
489+ });
490+ if (!weakIoCtx->isValid ()) {
491+ // This can happen if we start a customEvent from this event and cancel it after this IoContext
492+ // gets destroyed. In that case we no longer have an IoContext available and can't get the
493+ // current time, but the outcome timestamp will have already been set. Since the outcome
494+ // timestamp is "late enough", simply use that.
495+ // TODO(o11y): fix this – spans should not be outliving the IoContext.
496+ if (completeTime != kj::UNIX_EPOCH) {
497+ span.endTime = completeTime;
498+ } else {
499+ // Otherwise, we can't actually get an end timestamp that makes sense. Report a zero-duration
500+ // span and log a warning (or fail assert in test mode).
501+ span.endTime = span.startTime ;
502+ if (isPredictableModeForTest ()) {
503+ KJ_FAIL_ASSERT (" reported span after IoContext was deallocated" );
504+ } else {
505+ KJ_LOG (WARNING, " reported span after IoContext was deallocated" );
506+ }
507+ }
508+ }
509+ }
510+ }
511+
516512void WorkerTracer::setReturn (
517513 kj::Maybe<kj::Date> timestamp, kj::Maybe<tracing::FetchResponseInfo> fetchResponseInfo) {
518514 // Match the behavior of setEventInfo(). Any resolution of the TODO comments in setEventInfo()
@@ -588,11 +584,7 @@ void UserSpanObserver::report(const Span& span) {
588584}
589585
590586void UserSpanObserver::reportStart (kj::ConstString& operationName, kj::Date startTime) {
591- submitter->submitSpanStart (spanId, parentSpanId, operationName, startTime);
592- }
593-
594- void UserSpanObserver::reportEnd (const Span& span) {
595- submitter->submitSpanEnd (spanId, span);
587+ submitter->submitSpanOpen (spanId, parentSpanId, operationName, startTime);
596588}
597589
598590// Provide I/O time to the tracing system for user spans.
0 commit comments