feat(pubsub/pulsar): register CloudEvents envelope Avro schema with Pulsar Schema Registry#4302
Open
javier-aliaga wants to merge 1 commit intodapr:mainfrom
Open
feat(pubsub/pulsar): register CloudEvents envelope Avro schema with Pulsar Schema Registry#4302javier-aliaga wants to merge 1 commit intodapr:mainfrom
javier-aliaga wants to merge 1 commit intodapr:mainfrom
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
The PR implements registration of CloudEvents envelope Avro schemas with Pulsar Schema Registry. Previously, when Dapr published to Pulsar with CloudEvents wrapping enabled (the default), the schema registered with Pulsar was only the inner domain event schema, creating a mismatch with the actual wire format. This fix wraps the inner schema in a CloudEvents envelope Avro schema before registration.
Changes:
- New CloudEvents schema wrapping functionality in
cloudevents_schema.gothat embeds inner Avro schemas as thedatafield of a CloudEvents envelope - Enhanced
schemaMetadatastructure to store both the inner schema codec and CloudEvents envelope codec - Updated producer and consumer registration to use the CloudEvents envelope schema when available
- Added explicit error handling to reject
rawPayload=trueon Avro schema topics using CloudEvents envelopes - Comprehensive test coverage for the new functionality
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
pubsub/pulsar/cloudevents_schema.go |
New file implementing CloudEvents Avro schema wrapping logic |
pubsub/pulsar/cloudevents_schema_test.go |
New comprehensive test file for schema wrapping with edge cases |
pubsub/pulsar/metadata.go |
Enhanced schemaMetadata struct with CE envelope fields |
pubsub/pulsar/pulsar.go |
Updated schema initialization, producer/consumer registration, and publish validation |
pubsub/pulsar/pulsar_test.go |
Added test helpers and new test cases for CE envelope handling |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…ulsar Schema Registry When Dapr publishes to Pulsar with CloudEvents wrapping (the default), the actual wire format is a CloudEvents envelope containing the user's domain event in the `data` field. Previously, the schema registered with the Pulsar Schema Registry was the inner domain event schema, causing a mismatch that breaks external consumers (e.g., Apache Flink) and may cause Pulsar to reject messages when schema enforcement is enabled. This change wraps the user-provided Avro schema inside a CloudEvents envelope Avro schema before registering it with the broker. The envelope includes all standard CE attributes (id, source, specversion, type), optional attributes (datacontenttype, subject, time), Dapr extensions (topic, pubsubname, traceid, traceparent, tracestate, expiration), and the data/data_base64 fields. The inner domain schema is embedded as the type of the nullable `data` union field. Using rawPayload=true on a topic with an Avro schema now returns an explicit error, since the broker enforces a single schema per topic and mixing CE-wrapped and raw payloads is undefined behavior. Signed-off-by: Javier Aliaga <javier@diagrid.io>
c3b6b2e to
28f9a05
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
When Dapr publishes to Pulsar with CloudEvents wrapping enabled (the default), the wire format is a CloudEvents envelope containing the user's domain event in the
datafield. Previously, the Avro schema registered with the Pulsar Schema Registry was the inner domain event schema, not the CloudEvents envelope — causing a mismatch between the schema in the registry and the actual messages stored in the topic.This PR fixes the mismatch by wrapping the user-provided Avro schema inside a CloudEvents envelope Avro schema before registering it with the broker.
Note: explicit error for rawPayload on Avro schema topics
Pulsar enforces a single schema per topic. Previously, publishing with
rawPayload=trueto an Avro-schema topic would fail silently or produce confusing errors due to the schema/wire mismatch. This was never a working combination.This PR makes the constraint explicit. The component now returns a clear error:
rawPayload=true is not compatible with Avro schema topics using CloudEvents envelope; use a separate topic for raw payloads
If you need raw payloads with Avro schema validation, use a dedicated topic that is only ever published to with
rawPayload=true.Issue reference
We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.
Please reference the issue this PR will close: #[issue number]
Checklist
Please make sure you've completed the relevant tasks for this PR, out of the following list:
Note: We expect contributors to open a corresponding documentation PR in the dapr/docs repository. As the implementer, you are the best person to document your work! Implementation PRs will not be merged until the documentation PR is opened and ready for review.