Skip to content

feat: support external schema registry (PIP-420)#399

Open
codelipenghui wants to merge 24 commits intomasterfrom
feat/external-schema-registry
Open

feat: support external schema registry (PIP-420)#399
codelipenghui wants to merge 24 commits intomasterfrom
feat/external-schema-registry

Conversation

@codelipenghui
Copy link
Copy Markdown
Contributor

Summary

  • Implement PIP-420 external schema registry support for pulsar-rs
  • Add PulsarSchema<T> async trait as the unified schema interface for encode/decode with external registries
  • Wire schema_id propagation end-to-end: PulsarSchema::encode()EncodeDataProducerMessageMessageMetadata → wire → consumer decode
  • Add DefaultPulsarSchema<T> bridging existing SerializeMessage/DeserializeMessage traits for backward compatibility
  • Add KeyValueSchema<K, V> composing two inner schemas with magic-byte framing (0xFE)
  • Add pulsar-schema-confluent workspace crate as a Confluent Schema Registry reference implementation

Changes

Protocol & Core (3 commits)

  • Sync PulsarApi.proto from upstream with SchemaType::External = 22 and schema_id in MessageMetadata
  • Add SchemaRegistry error variant
  • Add schema_id_util module with magic-byte framing helpers

Schema Framework (4 commits)

  • PulsarSchema<T> trait: schema_info(), encode(), decode(), close()
  • EncodeData struct with payload and optional schema_id
  • DefaultPulsarSchema<T> and KeyValueSchema<K, V> implementations
  • Re-exports from pulsar::schema

Consumer Integration (4 commits)

  • Message<T> gains decoded: Option<T> field with value() accessor
  • DecodedMessageReceiver<T> type alias for schema-decoded channel
  • ConsumerBuilder::with_schema() for type-erased schema injection
  • Async decode task spawned between ConsumerEngine and TopicConsumer

Producer Integration (1 commit)

  • ProducerBuilder::with_schema() and Producer::send_schema_non_blocking()
  • schema_id propagated through batching pipeline (BatchItem 3-tuple)
  • connection.rs wires schema_id into MessageMetadata

Confluent Reference Implementation (1 commit)

  • pulsar-schema-confluent workspace crate with ConfluentJsonSchema<T>
  • ConfluentConfig, ConfluentAuth, SubjectNameStrategy types
  • ConfluentSchemaFactory convenience builder
  • Schema registration is placeholder for initial release

Test plan

  • All 53 integration tests pass with live Pulsar broker
  • All 17 doc-tests pass
  • 5 confluent crate unit tests pass (1 ignored — requires live schema registry)
  • Manual test with Confluent Schema Registry (future iteration)

🤖 Generated with Claude Code

codelipenghui and others added 17 commits March 12, 2026 19:08
Sync the protobuf definition from apache/pulsar master to pick up:
- External = 22 in Schema.Type enum
- optional bytes schema_id field in MessageMetadata

Also fix compilation errors from new fields added to
CommandCloseProducer, CommandCloseConsumer, and CommandUnsubscribe
by using ..Default::default() / .. in struct literals and patterns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…a_id propagation

- Add with_schema() to ProducerBuilder (type-erased via Box<dyn Any>)
- Add schema_object field to Producer for schema-aware encoding
- send_schema_non_blocking() uses PulsarSchema::encode() before SerializeMessage
- Add schema_id field to Message, ProducerMessage, and BatchItem
- Propagate schema_id through batching pipeline to MessageMetadata
- Wire schema_id into connection.rs MessageMetadata construction

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move the Confluent Schema Registry integration crate into the pulsar-rs
repo as a Cargo workspace member. This keeps the reference implementation
co-located with the core library for easier development and CI.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
C1: Propagate schema through Consumer/MultiTopicConsumer for
    reconnect and seek. Store Arc<dyn PulsarSchema<T>> in both
    structs and pass to all TopicConsumer::new() calls.

C2: Return explicit error on schema downcast type mismatch in
    ConsumerBuilder::build(), into_reader(), and
    Producer::send_schema_non_blocking() instead of silently
    falling through.

C3: ConfluentJsonSchema::new() now returns Result<Self, Error>
    instead of panicking on SrSettings build failure.

C4: Schema decode errors now forward (id, payload, None) with
    a warning log instead of discarding the MessageIdData,
    allowing consumers to ack/nack failed messages.

C5: send_schema_non_blocking() accepts optional Message metadata
    to preserve properties, partition_key, and event_time when
    using external schemas.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
I1: Add debug_assert! guards in ConsumerBuilder build()/into_reader()
    to catch accidental use of cloned builders (schema_object lost).
I2: Replace pass-through adapter task with MessageReceiver enum that
    maps raw messages inline in poll_next — no extra task or channel
    hop when no schema is attached.
I3: Validate batch schema_id consistency — reject entire batch when
    messages have conflicting schema_ids (PIP-420 requirement).
I4: Mark producer::Message as #[non_exhaustive] so adding schema_id
    field is not a semver-breaking change. Add Message::new() constructor.
I5: Change strip_magic_header() return type to Result<Option<...>> so
    corrupt data returns Err instead of being indistinguishable from
    absent framing. Add corruption test cases.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1. Replace debug_assert! with runtime Result error in ConsumerBuilder
   and ProducerBuilder — debug_assert! is stripped in release builds.
2. Add decode_error field to consumer Message<T> so callers can
   distinguish "no schema attached" from "decode failed" without
   relying on logs alone. Expose via message.decode_error() accessor.
3. Clear schema_info in Clone impls (both builders) to prevent clones
   from negotiating External schema they cannot decode/encode.
4. Log warning when KV decode receives Single schema_id framing —
   indicates protocol/configuration mismatch.
5. Use std::sync::Once guard for confluent encode placeholder warning
   to prevent per-message log flooding in high-throughput producers.
6. Fix with_schema() doc to reference send_schema_non_blocking()
   instead of send_non_blocking().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- build_multi_topic() now propagates schema_info and schema_object
  from ProducerBuilder into MultiTopicProducer, so multi-topic and
  partitioned-topic producers created via with_schema() correctly
  negotiate schemas with the broker and encode messages with schema_id.
- Changed build_multi_topic() return type to Result for consistency
  with build() and to guard against inconsistent schema state.
- Added send_schema_non_blocking() to MultiTopicProducer for schema-
  aware message sending across topics.
- Extracted get_or_create_producer() helper to reduce duplication.
- Added 7 unit tests covering schema propagation in multi-topic and
  partitioned-topic producer flows.
- Fixed clippy warnings: redundant returns in oauth2.rs, redundant
  field names in schema_id_util.rs, manual_is_multiple_of in examples.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@codelipenghui codelipenghui self-assigned this Mar 13, 2026
codelipenghui and others added 4 commits March 12, 2026 23:24
C2: Fix KV encode double-framing of magic bytes. Inner schemas return
    schema IDs with a 0xFF magic prefix. KeyValueSchema::encode() now
    strips that prefix via strip_single_magic_prefix() before wrapping
    in the 0xFE KV frame, preventing double-framing on the wire. Added
    a round-trip test with a RecordingSchema mock that verifies inner
    decode() receives raw IDs without the 0xFF prefix.

I1: KV decode now returns an error when it encounters Single (0xFF)
    framing on a KeyValue message, instead of silently falling back
    to (None, None). A Single schema_id on a KV message indicates a
    real configuration problem that should surface immediately.

I2: Confluent placeholder encode() now logs a warning on every call
    instead of only once. The once-guarded warning was insufficient
    because after the first message, subsequent encodes silently
    returned schema_id: None with no indication.

I3: Fixed MessageReceiver doc comment: Raw maps to 4-tuple
    (id, payload, None, None), not 3-tuple (id, payload, None).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Remove #[non_exhaustive] from producer::Message. This was a
  source-breaking change: downstream SerializeMessage impls that
  construct `Message { payload, ..Default::default() }` would stop
  compiling. The `Message::new()` constructor remains available as
  a convenience but is no longer required.

- Revert build_multi_topic() to return MultiTopicProducer directly
  instead of Result<MultiTopicProducer, Error>. The only error path
  was a defensive guard for an inconsistent state that cannot occur
  through normal API usage (Clone clears both schema fields together).
  Replaced with debug_assert! for development-time safety without
  breaking every existing caller.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
I1: Replace per-call log::warn! in ConfluentJsonSchema::encode() with
    AtomicBool-gated warn-once + log::debug! for subsequent calls.
    Prevents log flooding at high throughput.

I2: Replace debug_assert! in build_multi_topic() with log::error! +
    schema drop.  Same invariant that build() catches via Result::Err,
    but without breaking the backward-compatible return type.  Prevents
    silent corruption in release builds where debug_assert is stripped.

I3: Update MultiTopicProducer::send_schema_non_blocking doc to document
    the type-mismatch error case, matching the single-topic variant.

Backward compat: Remove schema_id from public producer::Message.
    Move it to internal ProducerMessage only.  Both send_schema_non_blocking
    variants now construct ProducerMessage directly and call send_raw(),
    keeping schema_id off the public API surface.  Add Producer::send_raw()
    helper for ProducerMessage dispatch with partition routing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@codelipenghui codelipenghui marked this pull request as ready for review March 13, 2026 17:12
codelipenghui and others added 2 commits March 13, 2026 12:16
Restore master's proto and add only the two fields needed:
- Schema.Type.External = 22
- MessageMetadata.schema_id = 32

Removes unrelated upstream formatting and feature additions that
were polluting the PR diff.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
After minimizing PulsarApi.proto, CommandCloseProducer,
CommandCloseConsumer, and CommandUnsubscribe have only the
fields already specified — the struct update syntax is redundant.
Rust 1.94.0 clippy flags this as an error.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@BewareMyPower
Copy link
Copy Markdown
Contributor

You might need to modify the AGENTS.md like:

Don't modify existing code unless necessary

Comment thread src/producer.rs
/// Used by [`send_schema_non_blocking`](Self::send_schema_non_blocking) to
/// encode messages at the multi-topic level before delegating to per-topic
/// producers.
schema_object: Option<Box<dyn Any + Send + Sync>>,
Copy link
Copy Markdown
Contributor

@PookieBuns PookieBuns Mar 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @codelipenghui, may I provide a recommendation to a separate approach than storing a Box<dyn Any + Send + Sync>?

The current implementation has a drawback in which types that aren't supported by the Schema are not invalidated at compile time but instead will produce a runtime error everytime send_schema_non_blocking is called when it tries to downcast into the Schema object. This is not ideal since if we stored the actual trait object we would have compile time guaranteed correctness of schema supported T's.

Assuming we would not want to make any breaking changes, and can't just force Producer to suddenly require type annotations, I have 2 recommendations that we could potentially take.

  1. Introduce a new type of producer like SchemaProducer, which owns the actual trait object and gets its own version of send_non_blocking. It would be initialized with something like Producer::with_schema which returns a SchemaProducerBuilder, or we could have the with_schema method on the ProducerBuilder create a type change.

The downside of this approach is the cardinality, since then there would be 3 extra structs to maintain (SchemaProducer,SchemaProducerBuilder,MultitopicSchemaProducer, and potentially TypedMessageBuilder).

  1. Add a generic type parameter T which defaults to Vec<u8> to the producer type, and the default producer, will hold <Box<dyn Schema<Vec<u8> + Send + Sync>>. The default_schema would only implement Schema for Vec<u8> (or introduce a bytes_schema). Then, send_schema by default will only take in Vec<u8>'s.

This is actually more similar to the java approach since initializing a producer without a schema will default to bytes schema.

However, I'm not 100% sure if this is non-breaking since it technically attaches a generic to the existing Producer (although with a default)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary reason is compatibility. The first option seems like a viable solution that avoids breaking changes, though it would increase the maintenance burden. I will give it some thought.

Comment thread src/producer.rs
/// The `T: 'static` bound is required for Rust's `Any`-based runtime
/// type-checking inside the type-erased schema dispatch.
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_schema_non_blocking<T: SerializeMessage + Sized + Send + 'static>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also introduce a create_schema_message or create_message_with_schema that gets something like a TypedMessageBuilder, parallel to MessageBuilder which allows building a message that utilizes the schema? This is similar to TypedMessageBuilder in java

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can follow up with a separate PR, keep this PR to focus on the schema abstraction and external schema support.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment makes sense. I think the main concern is it introduces a separated public API (send_schema_non_blocking) to send messages. However, like other client implementation, we should reuse the existing send_non_blocking method to send a message with custom schema.

Comment thread src/schema/key_value.rs
use super::schema_id_util::SchemaIdInfo;

/// Composes two inner schemas for key-value message types.
pub struct KeyValueSchema<K, V> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to support SEPARATED ? Java implements this by doing a setSeparateKeyValue

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can follow up with a separate PR, keep this PR to focus on the schema abstraction and external schema support.

- Revert is_multiple_of() back to % operator in examples
- Restore original doc comment style in consumer/message.rs
- Remove unnecessary .. on CommandCloseConsumer destructure

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment thread src/producer.rs
/// The `T: 'static` bound is required for Rust's `Any`-based runtime
/// type-checking inside the type-erased schema dispatch.
#[cfg_attr(feature = "telemetry", tracing::instrument(skip_all))]
pub async fn send_schema_non_blocking<T: SerializeMessage + Sized + Send + 'static>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment makes sense. I think the main concern is it introduces a separated public API (send_schema_non_blocking) to send messages. However, like other client implementation, we should reuse the existing send_non_blocking method to send a message with custom schema.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants