Skip to content

Implementation Gap #1

@closedLoop

Description

@closedLoop

Okay, here are the proposals and corresponding GitHub issue drafts for the next two major points identified:

  1. Addressing the synchronous flow.run() call within the async Task Manager.
  2. Refining Pydantic models and input/output handling for full A2A compliance.

1. Proposal: Ensure Non-Blocking Flow Execution in Async Task Manager

1. Introduction & Problem Statement

The PocketFlowTaskManager.on_send_task method in a2aflow/tasks.py is defined as async def because it's called from the async FastAPI request handler. However, within this method, the core agent logic is invoked using the synchronous self.flow.run(shared).

If the underlying PocketFlow Flow (self.flow) or any of its constituent Nodes perform blocking operations (e.g., synchronous network calls, significant CPU computation, synchronous file I/O), this call will block the entire FastAPI/Uvicorn event loop. This negates the benefits of using an async web framework, severely limiting the server's concurrency and responsiveness under load.

2. Goals

  • Ensure that the execution of the PocketFlow Flow within on_send_task does not block the main async event loop.
  • Maintain compatibility with both synchronous and asynchronous PocketFlow Flow definitions provided by the user.
  • Improve the overall performance and scalability of the A2AServer.

3. Proposed Architecture & Solutions

We need to modify the invocation of the flow within PocketFlowTaskManager.on_send_task. The approach depends on the nature of the underlying self.flow:

  • Option A (Preferred if Flow is Async): Use run_async

    • If the self.flow provided by the user is an instance of pocketflow.AsyncFlow (or a subclass like A2AFlow which inherits from it), replace self.flow.run(shared) with await self.flow.run_async(shared).
    • This leverages PocketFlow's native async execution path, allowing async nodes within the flow to yield control correctly.
  • Option B (For Sync Flows): Use asyncio.to_thread

    • If the self.flow is a standard synchronous pocketflow.Flow, we cannot directly await run().
    • Replace self.flow.run(shared) with await asyncio.to_thread(self.flow.run, shared).
    • This executes the potentially blocking synchronous run method in a separate thread from asyncio's thread pool, preventing it from blocking the main event loop.
  • Combined Approach (Recommended for Flexibility):

    • Check the type of self.flow at runtime within on_send_task.
    • If it's an AsyncFlow, use Option A (await self.flow.run_async(shared)).
    • Otherwise (if it's a sync Flow), use Option B (await asyncio.to_thread(self.flow.run, shared)).
    • This allows users to provide either type of flow without needing different server configurations.

4. Implementation Steps

  1. Modify PocketFlowTaskManager.on_send_task:
    • Locate the self.flow.run(shared) call.
    • Implement the "Combined Approach" described above using an isinstance check:
      from pocketflow import AsyncFlow # Add import at the top
      import asyncio # Add import at the top
      # ... inside on_send_task ...
      if isinstance(self.flow, AsyncFlow):
          await self.flow.run_async(shared)
      else:
          await asyncio.to_thread(self.flow.run, shared)
      # ... rest of the method ...
  2. Update AsyncA2ANode: Ensure AsyncA2ANode is correctly defined and used if async flows are expected (already present in core.py, appears okay).
  3. Review Examples: Update examples like streaming_agent.py to ensure they provide an AsyncFlow to the server if they use AsyncNodes.
  4. Update Tests: Modify tests/test_server.py (specifically the integration tests using MockFlow or similar) to correctly await the on_send_task call and potentially mock run_async or asyncio.to_thread as needed.

5. Impact & Benefits

  • Performance: Prevents blocking of the async event loop, significantly improving server concurrency and throughput.
  • Responsiveness: Ensures the server remains responsive to other requests even while a long-running task is processing.
  • Correctness: Properly integrates PocketFlow execution (both sync and async) into the async server environment.
  • Flexibility: Allows users to build A2A agents using either synchronous or asynchronous PocketFlow graphs.

6. Potential Considerations

  • Running synchronous code via asyncio.to_thread has overhead. If performance is absolutely critical and the flow logic can be made fully async, using AsyncFlow and run_async is generally more efficient.
  • Error handling within the to_thread call needs to be robust.

GitHub Issue Draft (Issue 1: Sync in Async)

Title: Bug: Synchronous flow.run() blocks event loop in async PocketFlowTaskManager.on_send_task

Labels: bug, performance, async, server, task-manager

Body:

Problem Description:

The PocketFlowTaskManager.on_send_task method is an async def function, but it currently calls the synchronous self.flow.run(shared). If the underlying PocketFlow Flow or its nodes perform blocking operations, this call blocks the FastAPI/Uvicorn event loop, severely impacting server concurrency and responsiveness.

Location: a2aflow/tasks.py, within the PocketFlowTaskManager.on_send_task method.

Expected Behavior:

The invocation of the PocketFlow Flow should be non-blocking, allowing the async server to handle other requests concurrently.

Proposed Solution:

Modify PocketFlowTaskManager.on_send_task to execute the flow asynchronously:

  1. Check if self.flow is an instance of pocketflow.AsyncFlow.
  2. If yes, use await self.flow.run_async(shared).
  3. If no (it's a sync pocketflow.Flow), use await asyncio.to_thread(self.flow.run, shared).

Acceptance Criteria:

  • The call to execute the PocketFlow Flow within on_send_task no longer blocks the event loop.
  • The server remains responsive to other requests (e.g., tasks/get) while a task is being processed via tasks/send.
  • Both synchronous and asynchronous PocketFlow Flow definitions can be successfully executed by the A2AServer.
  • Tests are updated to reflect and verify the asynchronous execution pattern.

Impact:

Fixing this is critical for server performance, scalability, and correct async operation.


2. Proposal: Align Models and I/O Handling with A2A Specification

1. Introduction & Problem Statement

The current implementation has several discrepancies regarding A2A data structures and handling:

  • Incomplete Models: Pydantic models in a2aflow/models.py and a2aflow/tasks.py are simplified and do not fully represent the A2A specification (specification/json/a2a.json), notably lacking detailed Part types (TextPart, FilePart, DataPart) and the Artifact structure.
  • Fragile Input Parsing: A2ANode._get_user_query in core.py unsafely assumes the first part of the input message exists and is text, making it prone to errors with multi-part messages or non-text inputs.
  • Limited Output Formatting: The PocketFlowTaskManager currently assumes simple text output (a2a_output_parts = [{"type": "text", "text": result}]). There's no defined way for PocketFlow nodes to signal the creation of multi-modal Parts or Artifacts for inclusion in the A2A Task response.
  • Model Location: A2A-specific Pydantic models are split between models.py and tasks.py, leading to potential confusion.

2. Goals

  • Achieve full alignment of A2AFlow's data models with the a2a.json specification, particularly for Message, Part, and Artifact.
  • Implement robust parsing of multi-modal A2A input messages.
  • Define and implement a clear convention for PocketFlow nodes to produce multi-modal A2A outputs (multiple parts, artifacts).
  • Consolidate A2A-specific Pydantic models into a single location (a2aflow/models.py).
  • Enable A2AFlow agents to correctly handle and generate multi-modal content as per the A2A spec.

3. Proposed Architecture & Solutions

  1. Consolidate and Enhance Models:

    • Move all A2A-specific Pydantic models (including Task, TaskStatus, SendTaskRequest, SendTaskResponse currently in tasks.py) to a2aflow/models.py.
    • Define detailed models in models.py based on a2a.json:
      • TextPart, FileContent, FilePart, DataPart.
      • Part = Annotated[Union[TextPart, FilePart, DataPart], Field(discriminator="type")] (using Pydantic v2 features).
      • Message model updated to use List[Part] instead of simple content: str.
      • Artifact model with parts: List[Part].
      • Update Task model to use the revised Message and Artifact models and align other fields (status, error, etc.) precisely with the spec.
      • Update AgentCard and other relevant models for full field alignment.
  2. Robust Input Parsing:

    • Refactor A2ANode.prep (or introduce a new helper method called by prep).
    • Instead of just _get_user_query, parse the entire request.params.message.parts list.
    • Iterate through the parts and populate the shared store based on part type and the node's SUPPORTED_CONTENT_TYPES.
    • Convention Example: Populate shared['a2a_input_text'] (string, concatenated text parts), shared['a2a_input_files'] (list of FileContent objects), shared['a2a_input_data'] (list of DataPart dicts). Nodes would then access these specific keys in their exec.
  3. Structured Output Formatting:

    • Define a convention for nodes to write results back to shared.
    • Convention Example: Nodes write to shared['a2a_output_parts'] (a list of Part model instances) and/or shared['a2a_output_artifacts'] (a list of Artifact model instances).
    • Modify PocketFlowTaskManager.on_send_task (after flow.run/run_async) to:
      • Read shared['a2a_output_parts'] and shared['a2a_output_artifacts'].
      • Construct the A2A Task.status.message (using the parts) and Task.artifacts based on these shared variables.
      • If these keys aren't present, default to a simple text response based on shared['result'] as a fallback.

4. Implementation Steps

  1. Refactor models.py / tasks.py: Move models, implement detailed Part and Artifact structures, align Task, Message, AgentCard etc., with a2a.json.
  2. Refactor core.py: Update A2ANode.prep (or add a new parsing method) to robustly handle message.parts and populate shared according to the new input convention. Update MultiModalNode accordingly.
  3. Refactor tasks.py: Update PocketFlowTaskManager.on_send_task to read shared['a2a_output_parts'] and shared['a2a_output_artifacts'] and use them to construct the Task object's message and artifacts fields, falling back to shared['result'] if necessary.
  4. Update Examples: Modify examples (multi_turn_agent.py, etc.) to use the new conventions if they need to handle multi-modal input/output or rely on specific A2A structures.
  5. Update Tests: Adapt tests (test_core.py, test_server.py) to use the new detailed models and verify correct parsing and formatting of multi-part messages and artifacts.

5. Impact & Benefits

  • A2A Compliance: Achieves full compliance with the A2A specification for message, part, and artifact structures.
  • Multi-Modal Capability: Enables A2AFlow agents to robustly send and receive text, files (bytes/URI), and structured data.
  • Robustness: Makes input handling less prone to errors from unexpected message formats.
  • Clarity: Consolidates models and provides clear conventions for data exchange between A2A and PocketFlow.

GitHub Issue Draft (Issue 2: Models & I/O Handling)

Title: Enhance: Align Models & I/O Handling with Full A2A Specification

Labels: enhancement, a2a-compliance, models, multi-modal, parsing

Body:

Problem Description:

  1. Incomplete Models: Pydantic models in a2aflow/models.py and tasks.py are simplified and do not fully represent the A2A spec (a2a.json), missing detailed Part types (TextPart, FilePart, DataPart), Artifact, and precise fields in Task, Message, etc.
  2. Fragile Input Parsing: A2ANode._get_user_query in core.py assumes the first message part exists and is text, failing on multi-part or non-text inputs.
  3. Limited Output Formatting: No clear convention exists for PocketFlow nodes to produce multi-modal Parts or Artifacts that the PocketFlowTaskManager can format into the A2A Task response.
  4. Model Location: A2A models are currently split between models.py and tasks.py.

Expected Behavior:

A2AFlow should use Pydantic models that accurately reflect the a2a.json specification. Input parsing should handle various Part types robustly. A clear mechanism should exist for PocketFlow nodes to generate structured A2A output including multiple Parts and Artifacts.

Proposed Solution:

  1. Consolidate & Enhance Models: Move all A2A models to a2aflow/models.py and define detailed TextPart, FilePart, DataPart, Artifact, and update Message, Task, AgentCard to fully match a2a.json.
  2. Robust Input Parsing: Refactor A2ANode.prep (or add a helper) to iterate through message.parts, handle different types, and populate shared clearly (e.g., shared['a2a_input_text'], shared['a2a_input_files']).
  3. Structured Output Convention: Define a convention (e.g., nodes write lists of Part/Artifact instances to shared['a2a_output_parts']/shared['a2a_output_artifacts']). Update PocketFlowTaskManager to read these keys and format the Task response accordingly.

Acceptance Criteria:

  • Pydantic models in a2aflow/models.py accurately reflect the structures in specification/json/a2a.json.
  • A2ANode can successfully parse A2A input messages containing multiple parts of different types (text, file, data).
  • PocketFlow nodes can produce outputs that result in A2A Task responses containing multiple Parts and Artifacts.
  • PocketFlowTaskManager correctly formats the final Task object based on the conventions defined for the shared store.
  • Examples and tests are updated to reflect and verify these changes.

Impact:

Enables full A2A compliance for message structures, supports multi-modal agents, increases robustness, and clarifies data handling conventions.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions