Skip to content

Commit 8d6729e

Browse files
committed
feat(teams): improve task dispatch, concurrency, and tool ergonomics
- Move task dispatch from mid-turn to post-turn to prevent dependent tasks from completing before the current agent's run finishes - Add team create lock to serialize list→create flows across concurrent group chat sessions, preventing duplicate task creation - Require list-before-create gate: agents must call team_tasks(list) before creating tasks - Make assignee required on task creation - Add pagination (50 per page) to task list with offset support - Slim task list/get/search responses with dedicated structs to reduce context token usage - Add task board snapshot in announce messages to leader - Workspace: allow subdirectory paths in read/delete, show directories in list output - UI: reduce kanban card title font size for better visual balance
1 parent 0d9906d commit 8d6729e

File tree

16 files changed

+392
-110
lines changed

16 files changed

+392
-110
lines changed

cmd/gateway_consumer_handlers.go

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -563,10 +563,6 @@ func handleTeammateMessage(
563563
ActorID: toAgent,
564564
},
565565
})
566-
// FailTask also unblocks dependent tasks.
567-
if postTurn != nil {
568-
postTurn.DispatchUnblockedTasks(ctx, teamID)
569-
}
570566
}
571567
} else {
572568
result := outcome.Result.Content
@@ -591,13 +587,15 @@ func handleTeammateMessage(
591587
ActorID: toAgent,
592588
},
593589
})
594-
// Dispatch newly-unblocked dependent tasks.
595-
if postTurn != nil {
596-
postTurn.DispatchUnblockedTasks(ctx, teamID)
597-
}
598590
}
599591
}
600592
}
593+
// Always dispatch unblocked tasks after member turn ends,
594+
// regardless of whether the task was already completed by the tool.
595+
// This ensures dependent tasks start only after the member's run finishes.
596+
if postTurn != nil {
597+
postTurn.DispatchUnblockedTasks(ctx, teamID)
598+
}
601599
}
602600
}
603601
}
@@ -645,14 +643,25 @@ func handleTeammateMessage(
645643
}
646644
memberAgent := inMeta["to_agent"]
647645

646+
// Build task board snapshot scoped to this batch (same origin_trace_id).
647+
taskBoardSnapshot := ""
648+
if teamIDStr := inMeta["team_id"]; teamIDStr != "" {
649+
if teamUUID, err := uuid.Parse(teamIDStr); err == nil {
650+
taskBoardSnapshot = buildTaskBoardSnapshot(ctx, teamStore, teamUUID, inMeta["origin_chat_id"], inMeta["origin_trace_id"])
651+
}
652+
}
653+
648654
announceContent := fmt.Sprintf(
649-
"[System Message] Team member %q completed task.\n\nResult:\n%s\n\n"+
650-
"Present this result to the user. Any media files are forwarded automatically. Do NOT search for files — the result above contains all relevant information.",
655+
"[System Message] Team member %q completed task.\n\nResult:\n%s",
651656
memberAgent, outcome.Result.Content,
652657
)
658+
if taskBoardSnapshot != "" {
659+
announceContent += "\n\n" + taskBoardSnapshot
660+
}
661+
announceContent += "\n\nPresent this result to the user. Any media files are forwarded automatically. Do NOT search for files — the result above contains all relevant information."
653662
// Append team workspace path so lead can locate files without searching.
654663
if ws := inMeta["team_workspace"]; ws != "" {
655-
announceContent += fmt.Sprintf("\n[Team workspace: %s — use read_file with path relative to workspace root, e.g. read_file(path=\"teams/...\")]", ws)
664+
announceContent += fmt.Sprintf("\n[Team workspace: %s — use workspace_read to read files, e.g. workspace_read(file_name=\"filename.md\")]", ws)
656665
}
657666

658667
// Route to the lead's session on the original channel/chat.
@@ -699,6 +708,9 @@ func handleTeammateMessage(
699708
announceOutCh := sched.Schedule(announceCtx, scheduler.LaneSubagent, announceReq)
700709
announceOutcome := <-announceOutCh
701710

711+
// Release team create lock — tasks already visible in DB, safe for other goroutines to list.
712+
announcePtd.ReleaseTeamLock()
713+
702714
// Post-turn: dispatch pending team tasks created during announce.
703715
if postTurn != nil {
704716
for tid, tIDs := range announcePtd.Drain() {
@@ -845,3 +857,42 @@ func handleStopCommand(
845857

846858
return true
847859
}
860+
861+
// buildTaskBoardSnapshot returns a formatted summary of batch task statuses
862+
// for inclusion in the announce message to the leader. Scoped by (teamID, chatID)
863+
// and filtered by origin_trace_id to show only tasks from the current batch.
864+
func buildTaskBoardSnapshot(ctx context.Context, teamStore store.TeamStore, teamID uuid.UUID, chatID, originTraceID string) string {
865+
if teamStore == nil || originTraceID == "" {
866+
return ""
867+
}
868+
allTasks, err := teamStore.ListTasks(ctx, teamID, "", store.TeamTaskFilterAll, "", "", chatID, 0)
869+
if err != nil || len(allTasks) == 0 {
870+
return ""
871+
}
872+
873+
// Filter to current batch by origin_trace_id stored in task metadata.
874+
var active, completed int
875+
var activeLines []string
876+
for _, t := range allTasks {
877+
tid, _ := t.Metadata["origin_trace_id"].(string)
878+
if tid != originTraceID {
879+
continue
880+
}
881+
switch t.Status {
882+
case store.TeamTaskStatusCompleted, store.TeamTaskStatusCancelled, store.TeamTaskStatusFailed:
883+
completed++
884+
default:
885+
active++
886+
activeLines = append(activeLines, fmt.Sprintf(" #%d %s — %s", t.TaskNumber, t.Subject, t.Status))
887+
}
888+
}
889+
total := active + completed
890+
if total == 0 {
891+
return ""
892+
}
893+
if active == 0 {
894+
return fmt.Sprintf("=== Task board (this batch) ===\nAll %d tasks completed.", total)
895+
}
896+
return fmt.Sprintf("=== Task board (this batch) ===\nTask progress: %d/%d completed, %d active:\n%s",
897+
completed, total, active, strings.Join(activeLines, "\n"))
898+
}

cmd/gateway_consumer_normal.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,9 @@ func processNormalMessage(
333333
go func(agentKey, channel, chatID, session, rID string, meta map[string]string, blockReplyEnabled bool, ptd *tools.PendingTeamDispatch) {
334334
outcome := <-outCh
335335

336+
// Release team create lock — tasks already visible in DB, other goroutines can list.
337+
ptd.ReleaseTeamLock()
338+
336339
// Post-turn: dispatch pending team tasks created during this turn.
337340
if postTurn != nil {
338341
for teamID, taskIDs := range ptd.Drain() {

internal/agent/loop.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error)
331331
if recovered, err := l.teamStore.RecoverStaleTasks(ctx, team.ID); err == nil && recovered > 0 {
332332
slog.Info("recovered stale tasks", "team", team.ID, "count", recovered)
333333
}
334-
if tasks, err := l.teamStore.ListTasks(ctx, team.ID, "newest", "", req.UserID, "", ""); err == nil {
334+
if tasks, err := l.teamStore.ListTasks(ctx, team.ID, "newest", "", req.UserID, "", "", 0); err == nil {
335335
var stale []string
336336
var inProgress []string
337337
for _, t := range tasks {
@@ -608,7 +608,7 @@ func (l *Loop) runLoop(ctx context.Context, req RunRequest) (*RunResult, error)
608608
if teamTaskCreates > teamTaskSpawns && !teamTaskRetried {
609609
if l.teamStore != nil && l.agentUUID != uuid.Nil {
610610
if team, _ := l.teamStore.GetTeamForAgent(ctx, l.agentUUID); team != nil && tools.IsTeamV2(team) {
611-
if tasks, err := l.teamStore.ListTasks(ctx, team.ID, "newest", "", req.UserID, "", ""); err == nil {
611+
if tasks, err := l.teamStore.ListTasks(ctx, team.ID, "newest", "", req.UserID, "", "", 0); err == nil {
612612
var pendingIDs []string
613613
for _, t := range tasks {
614614
if t.Status == store.TeamTaskStatusPending {

internal/agent/resolver_helpers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ func buildTeamMD(team *store.TeamData, members []store.TeamMemberData, selfID uu
8383
sb.WriteString("Do NOT use `spawn` for team delegation — `spawn` is only for self-clone subagent work.\n\n")
8484
sb.WriteString("Rules:\n")
8585
sb.WriteString("- Always specify `assignee` — match member expertise from the list above\n")
86+
sb.WriteString("- **Check task board first** — ALWAYS call `team_tasks(action=\"list\")` before creating tasks. The system blocks creation if you skip this step\n")
8687
sb.WriteString("- Create all tasks first, then briefly tell the user what you delegated\n")
8788
sb.WriteString("- Do NOT add confirmations (\"Done!\", \"Got it!\") — just state what was assigned\n")
8889
sb.WriteString("- Results arrive automatically — do NOT present partial results\n")

internal/channels/telegram/commands_tasks.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (c *Channel) handleTasksList(ctx context.Context, chatID int64, isGroup boo
8181
return
8282
}
8383

84-
tasks, err := c.teamStore.ListTasks(ctx, team.ID, "newest", store.TeamTaskFilterAll, taskUserID(c.Name(), chatID, isGroup), "", "")
84+
tasks, err := c.teamStore.ListTasks(ctx, team.ID, "newest", store.TeamTaskFilterAll, taskUserID(c.Name(), chatID, isGroup), "", "", 0)
8585
if err != nil {
8686
slog.Warn("tasks command: ListTasks failed", "error", err)
8787
send("Failed to list tasks. Please try again.")
@@ -173,7 +173,7 @@ func (c *Channel) handleTaskDetail(ctx context.Context, chatID int64, text strin
173173
return
174174
}
175175

176-
tasks, err := c.teamStore.ListTasks(ctx, team.ID, "newest", store.TeamTaskFilterAll, taskUserID(c.Name(), chatID, isGroup), "", "")
176+
tasks, err := c.teamStore.ListTasks(ctx, team.ID, "newest", store.TeamTaskFilterAll, taskUserID(c.Name(), chatID, isGroup), "", "", 0)
177177
if err != nil {
178178
slog.Warn("task_detail command: ListTasks failed", "error", err)
179179
send("Failed to list tasks. Please try again.")
@@ -236,7 +236,7 @@ func (c *Channel) handleCallbackQuery(ctx context.Context, query *telego.Callbac
236236
return
237237
}
238238

239-
tasks, err := c.teamStore.ListTasks(ctx, team.ID, "newest", store.TeamTaskFilterAll, taskUserID(c.Name(), chatID, isGroup), "", "")
239+
tasks, err := c.teamStore.ListTasks(ctx, team.ID, "newest", store.TeamTaskFilterAll, taskUserID(c.Name(), chatID, isGroup), "", "", 0)
240240
if err != nil {
241241
send("Failed to list tasks.")
242242
return

internal/gateway/methods/teams_crud.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func (m *TeamsMethods) handleTaskList(ctx context.Context, client *gateway.Clien
156156
return
157157
}
158158

159-
tasks, err := m.teamStore.ListTasks(ctx, teamID, "newest", params.Status, "", params.Channel, params.ChatID)
159+
tasks, err := m.teamStore.ListTasks(ctx, teamID, "newest", params.Status, "", params.Channel, params.ChatID, 0)
160160
if err != nil {
161161
slog.Warn("teams.tasks.list failed", "team_id", teamID, "status_filter", params.Status, "error", err)
162162
client.SendResponse(protocol.NewErrorResponse(req.ID, protocol.ErrInternal, err.Error()))

internal/store/pg/teams_tasks.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (s *PGTeamStore) UpdateTask(ctx context.Context, taskID uuid.UUID, updates
163163
return execMapUpdate(ctx, s.db, "team_tasks", taskID, updates)
164164
}
165165

166-
func (s *PGTeamStore) ListTasks(ctx context.Context, teamID uuid.UUID, orderBy string, statusFilter string, userID string, channel string, chatID string) ([]store.TeamTaskData, error) {
166+
func (s *PGTeamStore) ListTasks(ctx context.Context, teamID uuid.UUID, orderBy string, statusFilter string, userID string, channel string, chatID string, offset int) ([]store.TeamTaskData, error) {
167167
orderClause := "t.priority DESC, t.created_at"
168168
if orderBy == "newest" {
169169
orderClause = "t.created_at DESC"
@@ -187,7 +187,7 @@ func (s *PGTeamStore) ListTasks(ctx context.Context, teamID uuid.UUID, orderBy s
187187
`+taskJoinClause+`
188188
WHERE t.team_id = $1 AND ($2 = '' OR t.user_id = $2) `+statusWhere+` `+scopeWhere+`
189189
ORDER BY `+orderClause+`
190-
LIMIT $3`, teamID, userID, maxListTasksRows, channel, chatID)
190+
LIMIT $3 OFFSET $6`, teamID, userID, maxListTasksRows+1, channel, chatID, offset)
191191
if err != nil {
192192
return nil, err
193193
}

internal/store/team_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,7 +320,7 @@ type TeamStore interface {
320320
// statusFilter: "" = non-completed (default), "completed", "all".
321321
// userID: if non-empty, filter to tasks created by this user.
322322
// channel+chatID: if either is non-empty, filter to that exact scope.
323-
ListTasks(ctx context.Context, teamID uuid.UUID, orderBy string, statusFilter string, userID string, channel string, chatID string) ([]TeamTaskData, error)
323+
ListTasks(ctx context.Context, teamID uuid.UUID, orderBy string, statusFilter string, userID string, channel string, chatID string, offset int) ([]TeamTaskData, error)
324324
// GetTask returns a single task by ID with joined agent info.
325325
GetTask(ctx context.Context, taskID uuid.UUID) (*TeamTaskData, error)
326326
// GetTasksByIDs returns multiple tasks by IDs in a single query.

internal/tools/context_keys.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,10 @@ const ctxPendingDispatch toolContextKey = "tool_pending_team_dispatch"
286286
// After the turn ends, the consumer drains and dispatches them.
287287
// Thread-safe: tools may execute in parallel goroutines.
288288
type PendingTeamDispatch struct {
289-
mu sync.Mutex
290-
tasks map[uuid.UUID][]uuid.UUID // teamID → []taskID
289+
mu sync.Mutex
290+
tasks map[uuid.UUID][]uuid.UUID // teamID → []taskID
291+
listed bool // true after list called in this turn
292+
teamLock *sync.Mutex // acquired on list, released before post-turn dispatch
291293
}
292294

293295
func NewPendingTeamDispatch() *PendingTeamDispatch {
@@ -310,6 +312,37 @@ func (p *PendingTeamDispatch) Drain() map[uuid.UUID][]uuid.UUID {
310312
return out
311313
}
312314

315+
// MarkListed records that list was called in this turn.
316+
func (p *PendingTeamDispatch) MarkListed() {
317+
p.mu.Lock()
318+
p.listed = true
319+
p.mu.Unlock()
320+
}
321+
322+
// HasListed reports whether list was called in this turn.
323+
func (p *PendingTeamDispatch) HasListed() bool {
324+
p.mu.Lock()
325+
defer p.mu.Unlock()
326+
return p.listed
327+
}
328+
329+
// SetTeamLock stores the acquired team create lock so it can be released post-turn.
330+
func (p *PendingTeamDispatch) SetTeamLock(m *sync.Mutex) {
331+
p.mu.Lock()
332+
p.teamLock = m
333+
p.mu.Unlock()
334+
}
335+
336+
// ReleaseTeamLock releases the held team create lock, if any.
337+
func (p *PendingTeamDispatch) ReleaseTeamLock() {
338+
p.mu.Lock()
339+
if p.teamLock != nil {
340+
p.teamLock.Unlock()
341+
p.teamLock = nil
342+
}
343+
p.mu.Unlock()
344+
}
345+
313346
func WithPendingTeamDispatch(ctx context.Context, ptd *PendingTeamDispatch) context.Context {
314347
return context.WithValue(ctx, ctxPendingDispatch, ptd)
315348
}

internal/tools/team_tasks_lifecycle.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,11 @@ func (t *TeamTasksTool) executeComplete(ctx context.Context, args map[string]any
8787
ActorID: ownerKey,
8888
})
8989

90-
// Immediately dispatch any newly-unblocked tasks.
91-
t.manager.DispatchUnblockedTasks(ctx, team.ID)
90+
// Dependent tasks are dispatched by the consumer after this agent's turn ends
91+
// (post-turn), not mid-turn. This prevents dependent tasks from completing and
92+
// announcing to the leader before this agent's own run finishes.
9293

93-
return NewResult(fmt.Sprintf("Task %s completed. Dependent tasks have been unblocked.", taskID))
94+
return NewResult(fmt.Sprintf("Task %s completed. Dependent tasks will be dispatched after this turn ends.", taskID))
9495
}
9596

9697
func (t *TeamTasksTool) executeCancel(ctx context.Context, args map[string]any) *Result {
@@ -135,10 +136,9 @@ func (t *TeamTasksTool) executeCancel(ctx context.Context, args map[string]any)
135136
ActorID: t.manager.agentKeyFromID(ctx, agentID),
136137
})
137138

138-
// Immediately dispatch any newly-unblocked tasks.
139-
t.manager.DispatchUnblockedTasks(ctx, team.ID)
139+
// Dependent tasks are dispatched by the consumer after this agent's turn ends (post-turn).
140140

141-
return NewResult(fmt.Sprintf("Task %s cancelled. Any running delegation has been stopped and dependent tasks unblocked.", taskID))
141+
return NewResult(fmt.Sprintf("Task %s cancelled. Dependent tasks will be unblocked after this turn ends.", taskID))
142142
}
143143

144144
func (t *TeamTasksTool) executeReview(ctx context.Context, args map[string]any) *Result {

0 commit comments

Comments
 (0)