App Logo
Concepts

Event Driven Architecture

A clean, flexible event bus abstraction for event-driven applications

The GoBetterAuth Event Bus provides a clean, flexible abstraction for event-driven application design. It enables you to publish and subscribe to application events using a very flexible interface, while keeping your application code decoupled from infrastructure details.

Overview

  • Zero external dependencies in your application code: The domain layer is library-agnostic and only uses Go's standard library.
  • Pluggable transports: Use the default in-memory implementation, any Watermill transport (Kafka, Redis, RabbitMQ, etc.), or your own custom PubSub implementation.
  • Clean separation: Application code interacts with high-level Event and EventBus interfaces, while transport details are abstracted behind the PubSub interface.

Layered Architecture

┌─────────────────────────────────────────────────────────────────┐
│                      APPLICATION LAYER                          │
│  (Your code - handlers, services, plugins)                      │
│                                                                 │
│  Uses: models.Event (high-level, structured data)               │
│        models.EventBus interface (Publish/Subscribe)            │
└─────────────────────────────────────────────────────────────────┘


							  │ Clean interface
							  │ (No external deps)

┌─────────────────────────────────────────────────────────────────┐
│                       DOMAIN LAYER                              │
│  models/events.go                                               │
│                                                                 │
│  • Event struct (application data)                              │
│  • Message struct (transport data)                              │
│  • EventBus interface                                           │
│  • PubSub interface ◄─── ABSTRACTION POINT                      │
└─────────────────────────────────────────────────────────────────┘


							  │ Implements domain.PubSub

		┌─────────────────────┼─────────────────────┐
		│                     │                     │
		▼                     ▼                     ▼
┌───────────────┐  ┌───────────────────┐  ┌─────────────────┐
│ GoChannel     │  │ Watermill Adapter │  │ Custom PubSub   │
│ (Default)     │  │                   │  │ (Your impl)     │
│ In-memory     │  │ Wraps any:        │  │ Pure Go         │
│ No deps       │  │ • Kafka           │  │ No deps         │
│               │  │ • Redis Streams   │  │ Full control    │
│               │  │ • RabbitMQ        │  │                 │
│               │  │ • NATS            │  │                 │
│               │  │ • Cloud services  │  │                 │
└───────────────┘  └───────────────────┘  └─────────────────┘

Data Flow

Publishing an Event

  1. Application code creates a models.Event and calls eventBus.Publish(ctx, event).
  2. The EventBus implementation ensures the event has an ID and timestamp, serializes it to JSON, and wraps it in a models.Message.
  3. The message is published to the configured PubSub transport (in-memory, Kafka, Redis, etc.).

Subscribing to Events

  1. Application code subscribes to an event type with eventBus.Subscribe(ctx, "event.type", handler).
  2. The EventBus subscribes to the topic in the PubSub transport and launches a goroutine to process messages.
  3. Each message is deserialized to a models.Event and passed to your handler.

Interface Contracts

High-Level (Application)

type Event struct {
	ID        string
	Type      string
	Timestamp time.Time
	Payload   json.RawMessage
	Metadata  map[string]string
}

type EventPublisher interface {
	Publish(ctx context.Context, event Event) error
	Close() error
}

type EventHandler func(ctx context.Context, event Event) error

type EventSubscriber interface {
	Subscribe(eventType string, handler EventHandler) (SubscriptionID, error)
	Unsubscribe(eventType string, id SubscriptionID)
	Close() error
}

type EventBus interface {
	EventPublisher
	EventSubscriber
}

Low-Level (Transport)

type Message struct {
	UUID     string
	Payload  []byte // Message payload (serialized data)
	Metadata map[string]string
}

type PubSub interface {
	Publish(ctx context.Context, topic string, msg *Message) error
	Subscribe(ctx context.Context, topic string) (<-chan *Message, error)
	Close() error
}

Implementation Options

1. In-Memory (Default)

No configuration required. If no PubSub is provided, the in-memory go channels based implementation is used:

import (
	gobetterauthconfig "github.com/GoBetterAuth/go-better-auth/config"
	gobetterauthmodels "github.com/GoBetterAuth/go-better-auth/models"
)

config := gobetterauthconfig.NewConfig(
	gobetterauthconfig.WithEventBus(gobetterauthmodels.EventBusConfig{
		Enabled: true,
	}),
)

2. Watermill Transports

Use any Watermill transport (Kafka, Redis, RabbitMQ, etc.) via the provided adapter:

import (
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v3/pkg/kafka"

	gobetterauthconfig "github.com/GoBetterAuth/go-better-auth/config"
	gobetterauthmodels "github.com/GoBetterAuth/go-better-auth/models"
	gobetterauthevents "github.com/GoBetterAuth/go-better-auth/events"
)

publisher, _ := kafka.NewPublisher(
	kafka.PublisherConfig{
		Brokers:   []string{"localhost:9092"},
		Marshaler: kafka.DefaultMarshaler{},
	},
	watermill.NewStdLogger(false, false),
)

subscriber, _ := kafka.NewSubscriber(
	kafka.SubscriberConfig{
		Brokers:       []string{"localhost:9092"},
		ConsumerGroup: "auth-service",
		Unmarshaler:   kafka.DefaultMarshaler{},
	},
	watermill.NewStdLogger(false, false),
)

kafkaPubSub := pubsub.NewWatermillPubSub(publisher, subscriber)

config := gobetterauthconfig.NewConfig(
	gobetterauthconfig.WithEventBus(gobetterauthmodels.EventBusConfig{
		Enabled: true,
		PubSub:  kafkaPubSub,
	}),
)

3. Custom PubSub Implementation

Implement the models.PubSub interface for full control and zero dependencies.

import (
	gobetterauthconfig "github.com/GoBetterAuth/go-better-auth/config"
	gobetterauthmodels "github.com/GoBetterAuth/go-better-auth/models"
)

type myCustomPubSub struct {
	// ... your fields here
}

func NewMyCustomPubSub() gobetterauthmodels.PubSub {
	return &myCustomPubSub{
		// ... initialize your fields here
	}
}

// Usage
config := gobetterauthconfig.NewConfig(
	gobetterauthconfig.WithEventBus(gobetterauthmodels.EventBusConfig{
		Enabled: true,
		PubSub:  NewMyCustomPubSub(),
	}),
)

Built-in Events

GoBetterAuth automatically publishes these events:

  • user.signed_up — User registration completed
  • user.logged_in — User successfully logged in
  • user.email_verified — Email verification completed
  • user.password_changed — Password changed
  • user.email_changed — Email address changed

Subscribing to Events

In Application Code

err := goBetterAuth.EventBus.Subscribe(
	gobetterauthmodels.EventUserLoggedIn,
	func(ctx context.Context, event gobetterauthmodels.Event) error {
		// Do something...
		return nil
	},
)

In Plugins

func (plugin *MyPlugin) Init(ctx *gobetterauthmodels.PluginContext) error {
	if ctx.EventBus != nil {
		if err := ctx.EventBus.Subscribe(
			gobetterauthmodels.EventUserSignedUp,
			func(ctx context.Context, event gobetterauthmodels.Event) error {
				// Do something...
				return nil
			},
		); err != nil {
			return err
		}
	}
	return nil
}

Publishing Custom Events

data := map[string]any{
	"action": "payment_completed",
	"amount": 99.99,
}
payload, err := json.Marshal(data)
if err != nil {
	slog.Error("failed to marshal event payload",
		"error", err,
	)
	return err
}

event := gobetterauthmodels.Event{
	Type:		"custom.event",
	Timestamp:	time.Now().UTC(),
	Payload: 		payload,
	Metadata: map[string]string{
		"source": "payment-service",
	},
}

goBetterAuth.EventBus.Publish(context.Background(), event)

Best Practices

  1. Error Handling: Event handlers should handle errors gracefully.
  2. Idempotency: Handlers should be idempotent (safe to run multiple times).
  3. Timeouts: Use context with timeout in handlers for long operations.
  4. Cleanup: Always close PubSub resources when shutting down.
  5. Abstraction: Keep your app code using models.Event.
// Example: robust event handler
func(ctx context.Context, event models.Event) error {
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()
	userID, ok := event.Payload["id"].(string)
	if !ok {
		return fmt.Errorf("invalid/missing user ID in event payload")
	}
	
	return nil
}

Benefits of This Design

  1. Zero External Dependencies: Models layer has no external deps
  2. Maximum Flexibility: Use Watermill, custom, or anything else
  3. Easy Testing: Swap implementations for tests
  4. Future-Proof: Not locked into any specific library
  5. Clean Architecture: Clear separation between models and infrastructure

On this page