fix(pubsub/pulsar): decode Avro binary payload on subscribe path#4266
fix(pubsub/pulsar): decode Avro binary payload on subscribe path#4266nelson-parente wants to merge 9 commits intodapr:mainfrom
Conversation
e941349 to
38dd7de
Compare
There was a problem hiding this comment.
Can we also add a certification test with pulsar avro - see one for equivalent json schema here https://github.com/dapr/components-contrib/blob/main/tests/certification/pubsub/pulsar/components/auth-none/consumer_four/pulsar.yaml
2f7dec6 to
53053e2
Compare
PR dapr#4244 added Avro schema validation on the publish path but did not update handleMessage on the subscribe side. The Pulsar Go client returns raw Avro binary bytes from msg.Payload() even when a schema consumer is configured; the Dapr runtime then tries json.Unmarshal on those bytes and fails with: invalid character '\x06' looking for beginning of value Fix: when an Avro schema is registered for the incoming topic, decode the binary payload to native Go types using the cached goavro codec (already compiled at init by parsePulsarMetadata), then re-encode as JSON before passing to the handler. This mirrors the publish path added in dapr#4244: Publish: NativeFromTextual (JSON → native) → msg.Value (Avro binary via SDK) Subscribe (this fix): NativeFromBinary (Avro binary → native) → TextualFromNative (→ JSON) No new dependencies — the goavro codec is already available in p.metadata.internalTopicSchemas[topic].codec. Adds three unit tests for the subscribe decode path: - round-trip: Avro binary → handleMessage → JSON CloudEvent - decode error: truncated payload → Nack + error returned - no schema: raw payload passed through unchanged (existing behaviour) Signed-off-by: Nelson Parente <nelson_parente@live.com.pt> Signed-off-by: Javier Aliaga <javier@diagrid.io>
- Fix import ordering (stdlib group before external) - TestHandleMessageHandlerErrorNacks: handler error → Nack, not Ack - TestHandleMessageAvroPropertiesPreserved: message properties survive Avro decode - TestHandleMessageNonAvroSchemaPassthrough: JSON-schema topic bypasses Avro decode Signed-off-by: Nelson Parente <nelson_parente@live.com.pt> Signed-off-by: Javier Aliaga <javier@diagrid.io>
Signed-off-by: Javier Aliaga <javier@diagrid.io>
Add TestPulsarAvroSchema that verifies the full round trip: JSON messages are Avro-encoded on publish, then decoded back to JSON on the subscribe path. Extracts publishSchemaMessages as a shared helper and adds the consumer_nine component config with .avroschema support. Signed-off-by: Javier Aliaga <javier@diagrid.io>
53053e2 to
7f21bde
Compare
…ma validation Use rawPayload on both publish and subscribe to bypass CloudEvent wrapping, which is incompatible with strict Avro schema validation. Rename schema fields from id/name to testId/testName to avoid collisions with CloudEvent envelope fields. Pre-register the Avro schema on the broker before the sidecar subscribes. Follows the same pattern used by the Kafka Avro certification test. Signed-off-by: Javier Aliaga <javier@diagrid.io>
Add consumer_nine component config under auth-oauth2 so TestPulsarAvroSchema runs for both auth types, not just auth-none. Signed-off-by: Javier Aliaga <javier@diagrid.io>
There was a problem hiding this comment.
Pull request overview
This PR fixes Pulsar pubsub subscriptions for topics configured with Avro schemas by decoding Avro-binary payloads back into JSON in the subscribe (handleMessage) path, aligning behavior with the Avro validation/encoding added on the publish path in #4244.
Changes:
- Decode Avro binary payloads to JSON in
pubsub/pulsarsubscribe handling when an Avro schema is configured for the topic. - Add unit tests covering Avro decode success/failure and passthrough behavior when no (or non-Avro) schema is present.
- Extend Pulsar certification tests + component manifests to validate Avro schema publish/subscribe round-trip.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
pubsub/pulsar/pulsar.go |
Decode Avro-binary payloads to JSON before handing messages to the runtime when topic schema protocol is Avro. |
pubsub/pulsar/pulsar_test.go |
Add unit tests for Avro decode behavior in handleMessage plus ack/nack expectations. |
tests/certification/pubsub/pulsar/pulsar_test.go |
Add a certification scenario for Avro schema topics and refactor schema-message publishing helpers. |
tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_nine/pulsar.yml.tmpl |
Add .avroschema metadata for the new certification consumer configuration (OAuth2). |
tests/certification/pubsub/pulsar/components/auth-none/consumer_nine/pulsar.yaml |
Add .avroschema metadata for the new certification consumer configuration (no auth). |
tests/certification/pubsub/pulsar/README.md |
Document the added Avro schema encode/decode round-trip certification coverage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Javier Aliaga <javier@diagrid.io>
There was a problem hiding this comment.
Pull request overview
This PR fixes Pulsar pubsub subscriptions for topics configured with .avroschema by decoding Avro binary payloads on the subscribe path before handing data to the Dapr runtime, preventing JSON deserialization failures and retry loops.
Changes:
- Decode Avro binary payloads in
handleMessageusing the cachedgoavro.Codec, then convert to JSON bytes before invoking the handler. - Add unit tests covering the Avro decode path (success, decode failure, passthrough for non-Avro/no-schema topics, metadata preservation).
- Extend Pulsar certification tests/components to validate Avro encode/decode round-trip behavior end-to-end.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
pubsub/pulsar/pulsar.go |
Decode Avro binary payloads to JSON in handleMessage when an Avro schema is configured for the topic. |
pubsub/pulsar/pulsar_test.go |
Adds unit tests for subscribe-side Avro decoding and related behavior (nack on decode error, passthrough, metadata). |
tests/certification/pubsub/pulsar/pulsar_test.go |
Adds a certification test and helpers to validate Avro schema round-trip on publish/subscribe. |
tests/certification/pubsub/pulsar/components/auth-oauth2/consumer_nine/pulsar.yml.tmpl |
Adds a new OAuth2 component variant configured with .avroschema metadata for certification. |
tests/certification/pubsub/pulsar/components/auth-none/consumer_nine/pulsar.yaml |
Adds a new no-auth component variant configured with .avroschema metadata for certification. |
tests/certification/pubsub/pulsar/README.md |
Documents the new Avro schema encode/decode certification scenario. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Javier Aliaga <javier@diagrid.io>
Summary
PR #4244 added Avro schema validation on the publish path but did not update
handleMessageon the subscribe side.The Pulsar Go client returns raw Avro binary bytes from
msg.Payload()even when a schema consumer is configured. The Dapr runtime then triesjson.Unmarshalon those bytes and fails:\x06is the first byte of the Avro binary-encoded payload (string length 3 forspecversion="1.0"encoded as zigzag varint). Every message on an Avro-enforced topic gets stuck in a permanent retry loop and is never delivered to the application.Root Cause
handleMessagealways passesmsg.Payload()(raw bytes) directly:The
goavro.Codecrequired for decoding is already compiled and cached inp.metadata.internalTopicSchemas[topic].codec(added by #4244). It is simply never used on the receive path.Fix
When an Avro schema is registered for the incoming topic, decode the binary payload to native Go types using the cached codec, then re-encode as JSON before passing to the handler. This mirrors the publish path:
NativeFromTextual(JSON → native) →msg.Value(Avro binary via SDK)NativeFromBinary(Avro binary → native) →TextualFromNative(→ JSON)No new dependencies. The fix is ~15 lines in
handleMessageonly.Tests
Adds three unit tests for the subscribe decode path:
handleMessage→ JSON delivered to handler ✅Affected Files
pubsub/pulsar/pulsar.go—handleMessageonlypubsub/pulsar/pulsar_test.go— new testsFixes the subscribe-side gap introduced by #4244.