How-To Guides

Practical recipes for common tasks with Pericarp.

Event Sourcing

How to subscribe to events using pattern matching

The EventDispatcher supports dot-separated pattern matching. Given an event type like user.created, handlers registered for any of these patterns will fire:

Pattern Matches
user.created Exact match
user.* All user events
*.created All creation events across entities
*.* Every event
dispatcher := domain.NewEventDispatcher()

// All order events (order.created, order.shipped, order.cancelled, ...)
domain.Subscribe(dispatcher, "order.*", func(ctx context.Context, env domain.EventEnvelope[any]) error {
    log.Printf("order event: %s for aggregate %s", env.EventType, env.AggregateID)
    return nil
})

// All creation events across the system
domain.Subscribe(dispatcher, "*.created", func(ctx context.Context, env domain.EventEnvelope[any]) error {
    log.Printf("something was created: %s", env.AggregateID)
    return nil
})

Use SubscribeWildcard for a true catch-all that doesn’t rely on dot structure:

dispatcher.SubscribeWildcard(func(ctx context.Context, env domain.EventEnvelope[any]) error {
    log.Printf("[audit] event=%s aggregate=%s", env.EventType, env.AggregateID)
    return nil
})

How to construct event type strings

Use EventTypeFor with the standard constants to build consistent type strings:

domain.EventTypeFor("user", domain.EventTypeCreate)  // "user.created"
domain.EventTypeFor("order", domain.EventTypeUpdate)  // "order.updated"
domain.EventTypeFor("product", domain.EventTypeDelete) // "product.deleted"
domain.EventTypeFor("graph", domain.EventTypeTriple)   // "graph.triple"

// Custom actions
domain.EventTypeFor("user", "deactivated")  // "user.deactivated"
domain.EventTypeFor("order", "shipped")     // "order.shipped"

How to persist events for multiple aggregates atomically

Track all aggregates in a single SimpleUnitOfWork:

uow := application.NewSimpleUnitOfWork(store, dispatcher)

uow.Track(user, order, invoice)

// All three aggregates' events are persisted.
// If any one fails (e.g. concurrency conflict), the commit fails.
err := uow.Commit(ctx)

How to handle concurrency conflicts

The EventStore uses optimistic concurrency control via expectedVersion. When a conflict is detected, reload and retry:

func updateUser(ctx context.Context, store domain.EventStore, id string) error {
    for retries := 0; retries < 3; retries++ {
        // Load current state
        user, err := LoadUser(ctx, store, id)
        if err != nil {
            return err
        }

        // Make changes
        user.ChangeEmail("new@example.com")

        // Attempt to persist
        events := user.GetUncommittedEvents()
        err = store.Append(ctx, id, user.GetSequenceNo(), events...)
        if errors.Is(err, domain.ErrConcurrencyConflict) {
            continue // Retry with fresh state
        }
        return err
    }
    return fmt.Errorf("failed after 3 retries")
}

How to retrieve events for a specific version range

// All events
events, _ := store.GetEvents(ctx, "user-1")

// Events from version 5 onwards
events, _ := store.GetEventsFromVersion(ctx, "user-1", 5)

// Events between versions 3 and 7 (inclusive)
events, _ := store.GetEventsRange(ctx, "user-1", 3, 7)

// From the start to version 5
events, _ := store.GetEventsRange(ctx, "user-1", -1, 5)

// From version 3 to the end
events, _ := store.GetEventsRange(ctx, "user-1", 3, -1)

How to use the FileStore for local development

store, err := infrastructure.NewFileStore("/tmp/my-events")
if err != nil {
    log.Fatal(err)
}
defer store.Close()

// Use exactly like MemoryStore — same EventStore interface.
// Events are persisted as JSON files, one per aggregate.

The FileStore creates the directory if it doesn’t exist and loads existing events from disk on startup.

How to marshal and unmarshal events as JSON

// Marshal a typed envelope to JSON
envelope := domain.NewEventEnvelope(myPayload, "agg-1", "user.created", 0)
data, err := domain.MarshalEventToJSON(envelope)

// Unmarshal back to a typed envelope
restored, err := domain.UnmarshalEventFromJSON[*MyPayloadType](data)
// restored.Payload is now *MyPayloadType — fully type-safe

How to add metadata to events

Attach arbitrary key-value metadata to any event envelope:

envelope := domain.NewEventEnvelope(payload, aggregateID, eventType, seqNo)
envelope.Metadata["correlation_id"] = "req-abc-123"
envelope.Metadata["user_agent"] = "web-client/1.0"
envelope.Metadata["source"] = "api"

Metadata is preserved through serialization and dispatch.


Commands (CQRS)

How to choose between async and queued dispatchers

Dispatcher Execution Best for
AsyncCommandDispatcher All receivers run concurrently Fan-out commands, independent side effects
QueuedCommandDispatcher Receivers run sequentially in registration order Ordered processing, deterministic results
// Concurrent execution
async := cqrs.NewAsyncCommandDispatcher()

// Sequential execution
queued := cqrs.NewQueuedCommandDispatcher()

Both implement the CommandDispatcher interface, so consuming code doesn’t need to change.

How to return early from a command dispatch

Use First() to get the first result and return an HTTP response while remaining receivers continue in the background:

func handleCreateUser(w http.ResponseWriter, r *http.Request) {
    envelope := cqrs.NewCommandEnvelope(cmd, "user.create")
    watchable := dispatcher.Dispatch(r.Context(), cqrs.ToAnyCommandEnvelope(envelope))

    result, ok := watchable.First()
    if !ok {
        http.Error(w, "no handler", 500)
        return
    }
    if result.Error != nil {
        http.Error(w, result.Error.Error(), 400)
        return
    }

    json.NewEncoder(w).Encode(result.Value)
    // Remaining receivers finish in the background — their results are buffered.
}

How to register a wildcard command receiver

// Audit logging for all commands
dispatcher.RegisterWildcardReceiver(func(ctx context.Context, env cqrs.CommandEnvelope[any]) (any, error) {
    log.Printf("[audit] command=%s id=%s", env.CommandType, env.ID)
    return nil, nil
})

Wildcard receivers fire for every command in addition to any pattern-matched receivers.


Authentication

How to set up the session manager with gorilla/sessions

import (
    "github.com/gorilla/sessions"
    authsession "github.com/akeemphilbert/pericarp/pkg/auth/infrastructure/session"
)

// Use a cookie store with a 32-byte key for encryption
store := sessions.NewCookieStore([]byte("your-32-byte-secret-key-here!!!"))

// Secure defaults: HttpOnly, Secure, SameSite=Lax
opts := authsession.DefaultSessionOptions()

sessionManager := authsession.NewGorillaSessionManager("myapp-session", store, opts)

For production, use a server-side store (Redis, database) instead of CookieStore so that session data isn’t stored in the cookie itself.

How to implement an OAuthProvider

The OAuthProvider interface is provider-agnostic. Implement it once per identity provider:

type OAuthProvider interface {
    Name() string
    AuthCodeURL(state, codeChallenge, nonce, redirectURI string) string
    Exchange(ctx context.Context, code, codeVerifier, redirectURI string) (*AuthResult, error)
    RefreshToken(ctx context.Context, refreshToken string) (*AuthResult, error)
    RevokeToken(ctx context.Context, token string) error
    ValidateIDToken(ctx context.Context, idToken, nonce string) (*UserInfo, error)
}

Register providers in the registry:

providers := authapp.OAuthProviderRegistry{
    "google": &GoogleProvider{clientID: "...", clientSecret: "..."},
    "github": &GitHubProvider{clientID: "...", clientSecret: "..."},
}

How to check authorization for an agent

Pericarp provides two implementations of AuthorizationChecker. Choose the one that fits your stack.

Option A: Casbin (batteries-included)

import casbinauth "github.com/akeemphilbert/pericarp/pkg/auth/infrastructure/casbin"

checker, err := casbinauth.NewCasbinAuthorizationChecker(gormAdapter)

allowed, err := checker.IsAuthorized(ctx, "agent-123", "odrl:read", "document-456")
allowed, err  = checker.IsAuthorizedInAccount(ctx, "agent-123", "account-789", "odrl:modify", "document-456")

Option B: PolicyDecisionPoint (bring your own store)

import authapp "github.com/akeemphilbert/pericarp/pkg/auth/application"

pdp := authapp.NewPolicyDecisionPoint(permissionStore) // you implement PermissionStore

allowed, err := pdp.IsAuthorized(ctx, "agent-123", "odrl:read", "document-456")
allowed, err  = pdp.IsAuthorizedInAccount(ctx, "agent-123", "account-789", "odrl:modify", "document-456")

Both follow the same evaluation order: prohibitions first (deny overrides), then permissions, then default deny.

How to check authorization within a session

The ValidateSession method returns permissions alongside session info:

info, err := authService.ValidateSession(ctx, sessionID)
if err != nil {
    // Handle ErrSessionNotFound, ErrSessionExpired, ErrSessionRevoked
}

// info.Permissions contains the agent's effective permissions
// info.AgentID, info.AccountID identify the authenticated context

How to revoke sessions

// Revoke a single session (e.g., user clicks "log out")
err := authService.RevokeSession(ctx, sessionID)

// Revoke all sessions for an agent (e.g., password change, security incident)
err := authService.RevokeAllSessions(ctx, agentID)

How to refresh OAuth tokens

Token refresh happens server-side. The user’s session continues seamlessly:

// Check if tokens need refreshing
needsRefresh, err := tokenStore.NeedsRefresh(ctx, credentialID)
if needsRefresh {
    result, err := authService.RefreshTokens(ctx, credentialID)
    // New tokens are stored automatically
}

How to scope a session to an account

For multi-tenant applications, scope a session to a specific account after login:

session, _ := authService.CreateSession(ctx, agentID, credentialID, ip, ua, 24*time.Hour)
session.ScopeToAccount("account-789")
sessionRepo.Save(ctx, session)

How to define roles and policies

import "github.com/akeemphilbert/pericarp/pkg/auth/domain/entities"

// Create a role
role, _ := new(entities.Role).With("role-editor", "Editor", "Can read and modify documents")

// Create an ODRL policy with permissions
policy, _ := new(entities.Policy).With("policy-1", "Editor Policy", entities.PolicyTypeSet)
policy.GrantPermission("role-editor", "odrl:read", "documents")
policy.GrantPermission("role-editor", "odrl:modify", "documents")
policy.SetProhibition("role-editor", "odrl:delete", "documents")

// Assign a role to an agent
agent, _ := new(entities.Agent).With("agent-1", "Alice", entities.AgentTypePerson)
agent.AssignRole("role-editor")

// Add an agent to an account with a role
account, _ := new(entities.Account).With("account-1", "Acme Corp")
account.AddMember("agent-1", "role-editor")

Authorization

How to set up the Casbin authorization checker

import (
    casbinauth "github.com/akeemphilbert/pericarp/pkg/auth/infrastructure/casbin"
    "github.com/casbin/casbin/v3/persist"
)

// Option A: With an adapter (e.g., GORM, file, PostgreSQL)
checker, err := casbinauth.NewCasbinAuthorizationChecker(gormAdapter)

// Option B: Without an adapter (in-memory only, useful for tests)
checker, err := casbinauth.NewCasbinAuthorizationChecker(nil)

If you already have a pre-configured Casbin enforcer with your own model:

import casbinlib "github.com/casbin/casbin/v3"

enforcer, _ := casbinlib.NewEnforcer("my_model.conf", "my_policy.csv")
checker := casbinauth.NewCasbinAuthorizationCheckerFromEnforcer(enforcer)

How to manage permissions and prohibitions

// Add a global permission (creates a Casbin policy: assignee, *, action, target, allow)
checker.AddPermission("editor", "odrl:read", "documents")

// Add a global prohibition (creates a Casbin policy: assignee, *, action, target, deny)
checker.AddProhibition("editor", "odrl:delete", "documents")

// Remove a permission or prohibition
checker.RemovePermission("editor", "odrl:read", "documents")
checker.RemoveProhibition("editor", "odrl:delete", "documents")

All convenience methods operate on global ("*") domain policies. For domain-specific policies, use NewCasbinAuthorizationCheckerFromEnforcer and interact with the enforcer directly.

How to assign global and account-scoped roles

// Global role — applies in all account contexts
checker.AssignRole("agent-1", "editor")

// Account-scoped role — applies only within a specific account
checker.AssignAccountRole("agent-1", "admin", "account-42")

// Revoke roles
checker.RevokeRole("agent-1", "editor")
checker.RevokeAccountRole("agent-1", "admin", "account-42")

Under the hood, global roles use domain "*" in Casbin grouping policies. A custom domain matching function ensures global roles match any request domain, while account-scoped roles match only their specific account.

How to query an agent’s effective permissions

// Get all effective permissions (eft=allow), including inherited via roles
permissions, err := checker.GetPermissions(ctx, "agent-1")
for _, p := range permissions {
    fmt.Printf("%s can %s on %s\n", p.Assignee, p.Action, p.Target)
}

// Get all effective prohibitions (eft=deny), including inherited via roles
prohibitions, err := checker.GetProhibitions(ctx, "agent-1")
for _, p := range prohibitions {
    fmt.Printf("%s cannot %s on %s\n", p.Assignee, p.Action, p.Target)
}

Both methods resolve role inheritance — if an agent holds a role, the role’s permissions/prohibitions are included in the result.

How to use the PolicyDecisionPoint with a custom store

For users who prefer full control over the data model instead of Casbin:

import authapp "github.com/akeemphilbert/pericarp/pkg/auth/application"

// 1. Implement PermissionStore against your storage layer
type myStore struct { /* your read model */ }

func (s *myStore) GetPermissionsForAssignee(ctx context.Context, assigneeID string) ([]authapp.Permission, error) { ... }
func (s *myStore) GetProhibitionsForAssignee(ctx context.Context, assigneeID string) ([]authapp.Permission, error) { ... }
func (s *myStore) GetRolesForAgent(ctx context.Context, agentID string) ([]string, error) { ... }
func (s *myStore) GetRolesForAgentInAccount(ctx context.Context, agentID, accountID string) ([]string, error) { ... }

// 2. Create the PDP
pdp := authapp.NewPolicyDecisionPoint(&myStore{})

// 3. Check authorization — same interface as CasbinAuthorizationChecker
allowed, err := pdp.IsAuthorized(ctx, "agent-1", "odrl:read", "documents")

The PDP collects all assignee IDs (agent + roles), checks prohibitions first (deny overrides), then permissions, then defaults to deny.

An agent can have multiple credentials from different providers:

// First login creates agent + credential
agent, googleCred, _ := authService.FindOrCreateAgent(ctx, authapp.UserInfo{
    Provider:       "google",
    ProviderUserID: "google-123",
    Email:          "alice@example.com",
    DisplayName:    "Alice",
})

// Later, link a GitHub credential to the same agent
githubCred, _ := new(entities.Credential).With(
    ksuid.New().String(), agent.GetID(),
    "github", "github-456", "alice@example.com", "alice",
)
credentialRepo.Save(ctx, githubCred)

Commands (CQRS)

How to use the Watchable in a select statement

watchable := dispatcher.Dispatch(ctx, envelope)

select {
case <-watchable.Done():
    results := collectRemaining(watchable)
    // All receivers finished
case <-time.After(5 * time.Second):
    // Timeout — receivers may still be running in the background
    log.Println("command timed out")
case <-ctx.Done():
    // Context cancelled
}

How to fire-and-forget a command

For scenarios where receivers must outlive the HTTP request, dispatch with a background context:

// Use background context so receivers aren't cancelled when the request ends
bgCtx := context.Background()
watchable := dispatcher.Dispatch(bgCtx, envelope)
// Don't call Wait() or First() — let receivers run entirely in the background
_ = watchable

How to register multiple typed receivers for the same command

// Validation receiver
cqrs.RegisterReceiver(dispatcher, "order.place", func(
    ctx context.Context, env cqrs.CommandEnvelope[PlaceOrderCommand],
) (any, error) {
    if env.Payload.Total <= 0 {
        return nil, fmt.Errorf("invalid total")
    }
    return "validated", nil
})

// Persistence receiver
cqrs.RegisterReceiver(dispatcher, "order.place", func(
    ctx context.Context, env cqrs.CommandEnvelope[PlaceOrderCommand],
) (any, error) {
    order := createOrder(env.Payload)
    return order.ID, nil
})

// Both receivers fire when "order.place" is dispatched.
// With AsyncCommandDispatcher they run concurrently.
// With QueuedCommandDispatcher they run in registration order.

Pericarp is distributed under the MIT License.

This site uses Just the Docs, a documentation theme for Jekyll.