Skip to content

Commit 2425b02

Browse files
committed
Adds tail tags from caller to tail workers
1 parent bc0bc2a commit 2425b02

File tree

12 files changed

+113
-2
lines changed

12 files changed

+113
-2
lines changed

src/workerd/api/trace.c++

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,26 @@ jsg::Optional<kj::Array<kj::String>> getTraceScriptTags(const Trace& trace) {
111111
}
112112
}
113113

114+
TraceItem::TailAttributeValue getTraceTailAttributeValue(const tracing::Attribute& tag) {
115+
KJ_REQUIRE(tag.value.size() == 1, "tail attributes must contain exactly one value");
116+
117+
KJ_SWITCH_ONEOF(tag.value[0]) {
118+
KJ_CASE_ONEOF(boolean, bool) {
119+
return TraceItem::TailAttributeValue(boolean);
120+
}
121+
KJ_CASE_ONEOF(number, double) {
122+
return TraceItem::TailAttributeValue(number);
123+
}
124+
KJ_CASE_ONEOF(integer, int64_t) {
125+
return TraceItem::TailAttributeValue(static_cast<double>(integer));
126+
}
127+
KJ_CASE_ONEOF(string, kj::ConstString) {
128+
return TraceItem::TailAttributeValue(kj::str(string));
129+
}
130+
}
131+
KJ_UNREACHABLE;
132+
}
133+
114134
kj::Own<TraceItem::FetchEventInfo::Request::Detail> getFetchRequestDetail(
115135
jsg::Lock& js, const Trace& trace, const tracing::FetchEventInfo& eventInfo) {
116136
const auto getCf = [&]() -> jsg::Optional<jsg::V8Ref<v8::Object>> {
@@ -191,6 +211,8 @@ TraceItem::TraceItem(jsg::Lock& js, const Trace& trace)
191211
scriptVersion(getTraceScriptVersion(trace)),
192212
dispatchNamespace(mapCopyString(trace.dispatchNamespace)),
193213
scriptTags(getTraceScriptTags(trace)),
214+
tailAttributes(trace.tailAttributes.map(
215+
[](auto& tags) { return KJ_MAP(tag, tags) { return tag.clone(); }; })),
194216
durableObjectId(mapCopyString(trace.durableObjectId)),
195217
executionModel(kj::str(trace.executionModel)),
196218
outcome(kj::str(trace.outcome)),
@@ -270,6 +292,20 @@ jsg::Optional<kj::Array<kj::StringPtr>> TraceItem::getScriptTags() {
270292
[](kj::Array<kj::String>& tags) { return KJ_MAP(t, tags) -> kj::StringPtr { return t; }; });
271293
}
272294

295+
jsg::Optional<jsg::Dict<TraceItem::TailAttributeValue>> TraceItem::getTailAttributes() {
296+
return tailAttributes.map([](kj::Array<tracing::Attribute>& tags) {
297+
return jsg::Dict<TraceItem::TailAttributeValue>{
298+
.fields =
299+
KJ_MAP(tag, tags) {
300+
return jsg::Dict<TraceItem::TailAttributeValue>::Field{
301+
.name = kj::str(tag.name),
302+
.value = getTraceTailAttributeValue(tag),
303+
};
304+
},
305+
};
306+
});
307+
}
308+
273309
jsg::Optional<kj::StringPtr> TraceItem::getDurableObjectId() {
274310
return durableObjectId.map([](auto& id) -> kj::StringPtr { return id; });
275311
}
@@ -732,6 +768,27 @@ void TraceItem::visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
732768
tracker.trackField("scriptTag", tag);
733769
}
734770
}
771+
KJ_IF_SOME(tags, tailAttributes) {
772+
for (const auto& tag: tags) {
773+
tracker.trackFieldWithSize("tailAttributeName", tag.name.size());
774+
for (const auto& value: tag.value) {
775+
KJ_SWITCH_ONEOF(value) {
776+
KJ_CASE_ONEOF(boolean, bool) {
777+
tracker.trackFieldWithSize("tailAttributeValue", sizeof(boolean));
778+
}
779+
KJ_CASE_ONEOF(number, double) {
780+
tracker.trackFieldWithSize("tailAttributeValue", sizeof(number));
781+
}
782+
KJ_CASE_ONEOF(integer, int64_t) {
783+
tracker.trackFieldWithSize("tailAttributeValue", sizeof(integer));
784+
}
785+
KJ_CASE_ONEOF(string, kj::ConstString) {
786+
tracker.trackFieldWithSize("tailAttributeValue", string.size());
787+
}
788+
}
789+
}
790+
}
791+
}
735792
tracker.trackField("outcome", outcome);
736793
}
737794

src/workerd/api/trace.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ struct ScriptVersion {
6969

7070
class TraceItem final: public jsg::Object {
7171
public:
72+
using TailAttributeValue = kj::OneOf<bool, double, kj::String>;
73+
7274
class FetchEventInfo;
7375
class JsRpcEventInfo;
7476
class ScheduledEventInfo;
@@ -101,6 +103,7 @@ class TraceItem final: public jsg::Object {
101103
jsg::Optional<ScriptVersion> getScriptVersion();
102104
jsg::Optional<kj::StringPtr> getDispatchNamespace();
103105
jsg::Optional<kj::Array<kj::StringPtr>> getScriptTags();
106+
jsg::Optional<jsg::Dict<TailAttributeValue>> getTailAttributes();
104107
jsg::Optional<kj::StringPtr> getDurableObjectId();
105108
kj::StringPtr getExecutionModel();
106109
kj::StringPtr getOutcome();
@@ -120,6 +123,7 @@ class TraceItem final: public jsg::Object {
120123
JSG_LAZY_READONLY_INSTANCE_PROPERTY(scriptVersion, getScriptVersion);
121124
JSG_LAZY_READONLY_INSTANCE_PROPERTY(dispatchNamespace, getDispatchNamespace);
122125
JSG_LAZY_READONLY_INSTANCE_PROPERTY(scriptTags, getScriptTags);
126+
JSG_LAZY_READONLY_INSTANCE_PROPERTY(tailAttributes, getTailAttributes);
123127
JSG_LAZY_READONLY_INSTANCE_PROPERTY(durableObjectId, getDurableObjectId);
124128
JSG_LAZY_READONLY_INSTANCE_PROPERTY(outcome, getOutcome);
125129
JSG_LAZY_READONLY_INSTANCE_PROPERTY(executionModel, getExecutionModel);
@@ -141,6 +145,7 @@ class TraceItem final: public jsg::Object {
141145
kj::Maybe<ScriptVersion> scriptVersion;
142146
kj::Maybe<kj::String> dispatchNamespace;
143147
jsg::Optional<kj::Array<kj::String>> scriptTags;
148+
kj::Maybe<kj::Array<tracing::Attribute>> tailAttributes;
144149
kj::Maybe<kj::String> durableObjectId;
145150
kj::String executionModel;
146151
kj::String outcome;

src/workerd/io/trace.c++

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -753,6 +753,13 @@ void Trace::copyTo(rpc::Trace::Builder builder) const {
753753
}
754754
}
755755

756+
KJ_IF_SOME(tags, tailAttributes) {
757+
auto list = builder.initTailAttributes(tags.size());
758+
for (auto i: kj::indices(tags)) {
759+
tags[i].copyTo(list[i]);
760+
}
761+
}
762+
756763
KJ_IF_SOME(e, entrypoint) {
757764
builder.setEntrypoint(e);
758765
}
@@ -859,6 +866,10 @@ void Trace::mergeFrom(rpc::Trace::Reader reader, PipelineLogLevel pipelineLogLev
859866
scriptTags = KJ_MAP(tag, tags) { return kj::str(tag); };
860867
}
861868

869+
if (auto tags = reader.getTailAttributes(); tags.size() > 0) {
870+
tailAttributes = KJ_MAP(tag, tags) { return tracing::Attribute(tag); };
871+
}
872+
862873
if (reader.hasEntrypoint()) {
863874
entrypoint = kj::str(reader.getEntrypoint());
864875
}

src/workerd/io/trace.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,7 @@ class Trace final: public kj::Refcounted {
869869
kj::Maybe<kj::String> dispatchNamespace;
870870
kj::Maybe<kj::String> scriptId;
871871
kj::Array<kj::String> scriptTags;
872+
kj::Maybe<kj::Array<tracing::Attribute>> tailAttributes;
872873
kj::Maybe<kj::String> entrypoint;
873874
kj::Maybe<kj::String> durableObjectId;
874875

src/workerd/io/tracer.c++

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,24 @@ namespace {
2121
// TODO(streaming-tail): Add a clear indicator for events being truncated based on MAX_TRACE_BYTES
2222
// so that developers can understand why this happens.
2323
static constexpr size_t MAX_TRACE_BYTES = 256 * 1024;
24+
25+
tracing::Attribute::Value cloneAttributeValue(const tracing::Attribute::Value& value) {
26+
KJ_SWITCH_ONEOF(value) {
27+
KJ_CASE_ONEOF(boolean, bool) {
28+
return tracing::Attribute::Value(boolean);
29+
}
30+
KJ_CASE_ONEOF(number, double) {
31+
return tracing::Attribute::Value(number);
32+
}
33+
KJ_CASE_ONEOF(integer, int64_t) {
34+
return tracing::Attribute::Value(integer);
35+
}
36+
KJ_CASE_ONEOF(string, kj::ConstString) {
37+
return tracing::Attribute::Value(string.clone());
38+
}
39+
}
40+
KJ_UNREACHABLE;
41+
}
2442
} // namespace
2543

2644
kj::Promise<kj::Own<Trace>> WorkerTracer::onComplete() {
@@ -34,11 +52,24 @@ kj::Promise<kj::Own<Trace>> WorkerTracer::onComplete() {
3452
WorkerTracer::WorkerTracer(kj::Maybe<kj::Rc<kj::Refcounted>> parentPipeline,
3553
kj::Own<Trace> trace,
3654
PipelineLogLevel pipelineLogLevel,
55+
kj::Maybe<kj::Array<tracing::Attribute>> tailAttributes,
3756
kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter)
3857
: pipelineLogLevel(pipelineLogLevel),
3958
trace(kj::mv(trace)),
4059
parentPipeline(kj::mv(parentPipeline)),
41-
maybeTailStreamWriter(kj::mv(maybeTailStreamWriter)) {}
60+
maybeTailStreamWriter(kj::mv(maybeTailStreamWriter)) {
61+
KJ_IF_SOME(tags, tailAttributes) {
62+
if (tags.size() == 0) {
63+
tailAttributes = kj::none;
64+
} else {
65+
for (auto& tag: tags) {
66+
KJ_REQUIRE(tag.value.size() == 1, "tail attributes must contain exactly one value");
67+
setWorkerAttribute(tag.name.clone(), cloneAttributeValue(tag.value[0]));
68+
}
69+
}
70+
}
71+
this->trace->tailAttributes = kj::mv(tailAttributes);
72+
}
4273

4374
WorkerTracer::~WorkerTracer() noexcept(false) {
4475
// Report the outcome event, which should have been delivered by now.

src/workerd/io/tracer.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ class WorkerTracer final: public BaseTracer {
113113
explicit WorkerTracer(kj::Maybe<kj::Rc<kj::Refcounted>> parentPipeline,
114114
kj::Own<Trace> trace,
115115
PipelineLogLevel pipelineLogLevel,
116+
kj::Maybe<kj::Array<tracing::Attribute>> tailAttributes,
116117
kj::Maybe<kj::Own<tracing::TailStreamWriter>> maybeTailStreamWriter);
117118
virtual ~WorkerTracer() noexcept(false);
118119
KJ_DISALLOW_COPY_AND_MOVE(WorkerTracer);

src/workerd/io/worker-interface.capnp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ struct Trace @0x8e8d911203762d34 {
168168

169169
entrypoint @22 :Text;
170170
durableObjectId @27 :Text;
171+
tailAttributes @28 :List(Attribute);
171172

172173
diagnosticChannelEvents @17 :List(DiagnosticChannelEvent);
173174
struct DiagnosticChannelEvent {

src/workerd/server/server.c++

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2158,7 +2158,7 @@ class Server::WorkerService final: public Service,
21582158
nullptr /* scriptTags */, mapCopyString(entrypointName), executionModel,
21592159
kj::none /* durableObjectId */);
21602160
kj::Own<WorkerTracer> tracer = kj::refcounted<WorkerTracer>(
2161-
kj::none, kj::mv(trace), PipelineLogLevel::FULL, kj::mv(tailStreamWriter));
2161+
kj::none, kj::mv(trace), PipelineLogLevel::FULL, kj::none, kj::mv(tailStreamWriter));
21622162

21632163
// When the tracer is complete, deliver traces to any buffered tail workers. We end up
21642164
// creating two references to the WorkerTracer, one held by the observer and one that will be

types/generated-snapshot/experimental/index.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3251,6 +3251,7 @@ interface TraceItem {
32513251
readonly scriptVersion?: ScriptVersion;
32523252
readonly dispatchNamespace?: string;
32533253
readonly scriptTags?: string[];
3254+
readonly tailAttributes?: Record<string, boolean | number | string>;
32543255
readonly durableObjectId?: string;
32553256
readonly outcome: string;
32563257
readonly executionModel: string;

types/generated-snapshot/experimental/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3257,6 +3257,7 @@ export interface TraceItem {
32573257
readonly scriptVersion?: ScriptVersion;
32583258
readonly dispatchNamespace?: string;
32593259
readonly scriptTags?: string[];
3260+
readonly tailAttributes?: Record<string, boolean | number | string>;
32603261
readonly durableObjectId?: string;
32613262
readonly outcome: string;
32623263
readonly executionModel: string;

0 commit comments

Comments
 (0)