Guidelines for event messages

Introduction

Interoperability is the scourge of all information systems: knowing what data means, how it is formatted, and what the semantics of the various fields are can be a daunting task. Micro-services are generally designed to process messages in potentially large quantities and, in order to do that, need to know what the messages mean. They are also generally designed to be de-coupled from each other so they can easily be replaced or updated. That means that to construct a system with a micro-service-oriented architecture, you need to have them communicate with each other and you need messages to do that. Those messages have to be well-defined and follow some ground rules.

Meta-data

Certain meta-data is needed in any message: some way to trace messages through a multi-micro-service system and tie them back to the original event or command that caused them and the transaction they are a part of, and information to tie it back to the user, tenant, or service that authorized the event or command. This is important for various reasons:

  • Meta-data is used to trace a single request or event through the entire system: a purchase transaction, for example, will start in the UI and be assigned a random correlation ID, after which that correlation ID is used for every command and event that results from that transaction. If anything goes wrong for a particular transaction, the correlation ID is what you use to know what happened, and where.
  • Meta-data is used to observe the behavior of any micro-service, or any device: a producer ID is added to the message and used to order events on the bus, and to debug a micro-service if it's showing intermittent bad behavior.
  • Meta-data is used to provide authentication and authorization in a way that is consistent with zero-trust principles of cybersecurity, using an authorization token to allow an action only if that token is authentic and authorizes that action, and consistently generating an audit trail for observability.

Any message will also have a type and its own id. The type, along with the topic the message is published to, allows the message to be routed to its intended recipients and allows the recipient to filter messages it needs to handle. The id is used to make sure a single message is only processed once, especially if the bus used by the system does not provide a deliver-only-once guarantee.

Micro-services also tend to be re-used, and should be designed for re-use and to be composable. They may be re-used on different bus implementations, such as an AMQP bus that has build-in routing, or a Kafka bus that is optimized for high volume and resiliency but leaves the routing to the user. To abstract this away from the user, and to allow messages to bridge buses through data pumps if needed, a few additional recommendations come to mind, which are discussed in this post.

The recommendations discussed in this post, to address the concerns outlined above, are:

  1. Use a common super-schema for all messages
  2. Generate events for anything that happens in the system and post them to an approriate topic on the bus (they can always be ignored if not needed).
  3. Generate commands only if the micro-service needs something to be done by a generic micro-service on the bus and that service should be de-coupled from the events generated by the other services (e.g. sending E-mail, running garbage collection tasks, etc.)
  4. If available, include the authorizing JWT token with the message.

Using a super-schema for all events

Different event bus implementations have different ways to handle meta-data. For micro-services to be both interoperable and agnostic of the bus they're used on, the schema should include any meta-data necessary to parse, track, trace, and authorize the event. Individual connectors can always copy the meta-data into header fields for the particular bus if needed.

This leads to a schema such as this one:

{
    "type": "object",
    "properties": {
        "id": { "type": "string", "format": "uuid" },
        "type": { "type": "string" },
        "metadata": {
            "type": "object",
            "properties": {
                "cid": { "type": "string", "format": "uuid" },
                "tid": { "type": "string", "format": "uuid" },
                "pid": { "type": "string", "format": "uuid" },
                "uid": { "type": "string", "format": "uuid" },
                "token": { "type": "string" }
            },
            "required": [ "cid", "pid" ]
        },
        "data": { "type": "object" }
    },
    "required": [ "id", "type", "metadata" ]
}

The value of the id field is a UUID generated by the micro-service for a specific transaction or message. If more than one event is generated for the same transaction they may have the same id if (and only if) they are not of the same type. If more than one event is generated for the same transaction, the metadata.tid field (described below) will have the same value for all of them, but the id field will be different between messages.

The value of the type field is the event type. This could be something like "PurchaseOrderReceived". The event type dictates the schema of the data field to be used when parsing (see below).

The metadata field contains five fields:

  • metadata.cid, a "correlation ID" tracks all events and transactions that results from an initial (user) action or other external event;
  • metadata.tid, a "transaction ID" (optional) tracks all events that result from the same transaction within the same micro-service or device;
  • metadata.pid, a "producer ID" is the unique ID of the producer of the event;
  • metadata.uid, a user ID (optional) indicates the user or tenant's unique ID, which may be required for some events;
  • metadata.token, an authorization token (optional), the JWT token authorizing the event and any actions that may result from it.

The schema of the data element is owned by, and documented with, the micro-service that generates the event. This schema is used to validate and parse the event when it is received, and contains the names and types of the data fields. Note that it does not contain the units and semantics of those data fields: those still need to be captured in application logic. Such application logic is ideally provided using a nuget, npm, or pypi package produced with the generating micro-service.

As indicated above, the producer ID can be used, at least by some brokers, to ensure events from the same producer are delivered in-order.

Generating events and commands

The observability of a system is the cornerstone of both the operational availability of the system and its cybersecurity. In order to know whether a system is operating correctly and meeting its service level objectives, you need service level indicators, probes, and synthetic transactions. To implement those, events help measure what the system is doing at any time. Micro-services should therefore announce what they do.

Events further help to de-couple micro-services from each other. For example, the orders micro-service in our example application (see below) listens for any events that would require it to do something, such as requests for purchases. It generates events depending on the business logic and configuration of the micro-service, which may affect the order's state, invoicing, and inventory. The aggregator micro-service aggregates all events, including heartbeats, and updates user-visible information (including SLIs).

De-coupling micro-services and announcing events on the bus even if they may be ignored like this allows micro-services to be composable and modular: micro-services can work in conjunction with each other and can be made to work individually, encapsulate their logic in a component that can be individually tested, developed, updated, and deployed. New micro-services can be added to the service without changing anything else in the system. In the example system we can add a micro-service that automatically prepares an order for an upstream provider, reacting to events from the inventory micro-service when inventory is running low.

In some cases, some coupling is needed: when the system needs to send an E-mail, for example, a micro-service in the system needs to pick up on the event(s) that cause the E-mail to be sent, format the E-mail with an appropriate template, and send it to the appropriate recipients. The micro-service that generates the event that sets all that in motion, while it should provide all the information necessary to fill in the blanks in the E-mail's template, should not need to know that an E-mail is being sent. On the other hand, for the most part, the micro-service that formats the E-mail and sends it to something like the AWS Simple Email Service should not have to know anything about flowers (our example application is a flower shop): all it should need is the identifier of the template it needs to use, the identifier of the list to send it to, and the values to inject into the template. That coupling should be encapsulated in a "glue code" micro-service that receives the events from the bus (i.e. an aggregator) and maps the appropriate events to the appropriate templates, and occasionally hits the appropriate database, to fetch the information to fill out the template.

In other cases, the things on the bus are just there to "poke" a micro-service into doing something, like garbage collection to get rid of stale data, change the storage tier of data, etc. In such cases, a simple cron job can put a message on the bus for the targeted micro-services to listen to and act upon.

Authorization, authentication, and events

A zero-trust architecture is based on the idea that no actor in the system is trusted solely based on its location in the system. This means every action by every micro-service in the system has to be authorized and authenticated. To do that, the system needs a centralized identity and access management (IAM) system which can generate authentic authorization tokens with the appropriate claims for authenticated tenants. Those tokens can then be authenticated off-line given the IAM's public key or a shared secret.

The token metadata field is a JWT token that identifies the tenant and its roles. The uid metadata field identifies the user on behalf of whom the event is being generated if it is not the subject of the token. This can happen if the micro-service has a tenant that has the necessary privileges to do something on behalf of a user that that user can't do directly using their own privileges.

When a micro-service receives an event, it authenticates the token, validates that the claims in the token authorizes the actions it needs to take, may log the token, and will log the uid if present. When a micro-service sends an event, it includes the authorizing token it received. When it sends a command, it includes its own token and the uid of the original event being acted on (which is the value of the uid field if present, or the sub claim in the authorizing token otherwise).

In some environments, it may be necessary to sign the event, or even to encrypt it with an AEAD. To do that, at least a digital signature needs to be applied to the JSON object encoding the event. Note that it is not sufficient to only sign the payload: the meta-data contains the authorizing token for the information contained in the payload, so the two need to be authenticated together.

Putting it all together: a Python implementation

The remainer of this post presents a Python implementation of what was discussed above, and a re-usable Python module for the super-schema that you can use in your own micro-services.

Sending and receiving events

There are generally two things you can do with a message: you can send one, or you can receive one. Following a single super-schema for all messages certainly makes receiving messages easier: you can validate the received message against the schema and dispatch it to the appropriate handler by parsing only the common fields in the message. Hence, it becomes possible to provide a single, shared implementation for all your micro-services to receive messages from a bus, encapsulating the details of dispatching events to handlers, parsing messages, and even connecting to the bus itself. In this post, I will show this by implementing just such a module in Python, and making it open source to boot.

But receiving messages it not the only thing that is facilitated by a common schema: the semantics of most of the metadata fields are well-defined, so some default behaviors can be set up for sending messages as well. Again, with this post, I will provide a Python implementation of such default behavior.

Receiving events in Python

Any message received from a bus will eventually need to be handled by some event handler that semantically knows what to do with the data. Dispatching events to those handlers is a fairly generic job, so let's look at that first. This is the first place where the super-schema shines: in a few lines of code, it allows you to find the right event handler for the event, and to handle errors gracefully.

In [1]:
'''
Get the event dispatcher, which validates the event against the superschema and hands it to the
right handler.
'''
from jsonschema import validators
Validator = validators.Draft202012Validator

_super_schema = {
    "type": "object",
    "properties": {
        "id": { "type": "string", "format": "uuid" },
        "type": { "type": "string" },
        "metadata": {
            "type": "object",
            "properties": {
                "cid": { "type": "string", "format": "uuid" },
                "tid": { "type": "string", "format": "uuid" },
                "pid": { "type": "string", "format": "uuid" },
                "uid": { "type": "string", "format": "uuid" },
                "token": { "type": "string" }
            },
            "required": [ "cid", "pid" ]
        },
        "data": { "type": "object" }
    },
    "required": [ "id", "type", "metadata" ]
}
_super_schema_validator = Validator(_super_schema)

def get_event_dispatcher(err, handlers):
    def dispatch(event):
        is_valid = _super_schema_validator.is_valid(event)
        if not is_valid:
            err({ 'error': 'SchemaMismatchError', 'message': 'Event does not match event schema' })
            return
        base_event_name = event['type'].rsplit(':', 1)[0]
        if event['type'] in handlers:
            handlers[event['type']](err, event)
        elif base_event_name in handlers:
            handlers[base_event_name](err, event)
        elif '__default__' in handlers and handlers['__default__']:
            handlers['__default__'](err, event)
    return dispatch

Note that this also allows you to handle versioning of your schema: if you have two versions of the same event, but with schema changes between the two, you can append the version number of the schema you're using like this: MyEvent:1, MyEvent:2 and you can choose which one you want to use as the default handler. The dispatch function returned by get_event_dispatcher will take the best match first, but will fall back on the more generic handler otherwise. Events for which no handlers are found are sent to the default handler if one is provided, or ignored otherwise.

An example event handler

Diagram of Crassula

Let's look at an example of how this approach might be used: imaging a flower shop's on-line store (the "Crassula" application I wrote about earlier (here, and here). Let's set the context a bit: the overall application looks like the image above. The micro-service we will be discussing here is the order service in that figure, which receives a "PurchaseOrderRequest" event. We will gloss over quite a few details here, but suffice it for now that a PurchaseOrderRequest obviously requires data, and therefore a data schema. The schema in question is listed below.

{
    "type": "object",
    "properties": {
        "id": { "type": "string", "format": "uuid" },
        "requestDate": { "type": "string", "format": "date" },
        "items": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name": { "type": "string" },
                    "code": { "type": "string" },
                    "qty": { "type": "integer", "minimum": 0 },
                    "price": { "type": "number", "minimum": 0 },
                    "discount": { "type": "number", "minimum": 0, "maximum": 100 }
                }
            }
        }
    },
    "required": [ "requestDate", "items" ]
}

As described in a previous post, when a purchase order request is received, the order is entered into the database, some business rules may be applied to determine whether it is automatically approved and invoiced later, or whether payment needs to be processed first (this would typically depend on the user's profile), inventory may need to be reserved, etc. For now, we'll just concern ourselves within dispatching and handling the message, which means the first thing we do is define the PO schema in code, set up a schema validator, and set up the minimal bit of boilerplate an event handler needs.

In [2]:
from jsonschema import validators
Validator = validators.Draft202012Validator

import logging
logging.getLogger().setLevel(logging.INFO)

_purchase_order_request_schema = {
    "type": "object",
    "properties": {
        "id": { "type": "string", "format": "uuid" },
        "requestDate": { "type": "string", "format": "date" },
        "items": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "name": { "type": "string" },
                    "code": { "type": "string" },
                    "qty": { "type": "integer", "minimum": 0 },
                    "price": { "type": "number", "minimum": 0 },
                    "discount": { "type": "number", "minimum": 0, "maximum": 100 }
                }
            }
        }
    },
    "required": [ "requestDate", "items" ]
}
purchase_order_request_schema_validator = Validator(_purchase_order_request_schema)

def on_purchase_order_requested_event(err, event):
    if ((not 'uid' in event['metadata'] or not event['metadata']['uid'])
        and (not 'token' in event['metadata'] or not event['metadata']['token'])) :
        err({ 'error': 'MissingUserError', 'message': 'Event does have uid (required for this event)' })
        return
    # Check for authorization here:
    #   If the token is there, authenticate it and check whether the claims allow for this purchase order
    #   to be processed. The claims should at least contain the uid. If they don't business rules for PO
    #   authorization may require us to set it aside for now.
    #   If the token is not there, the PO is not pre-authorized. Again, business rules for PO authorization
    #   may require us to set it aside for now.
    # If we have a token but it's not authentic, this is where we bail out.
    # If we have an authentic token, or a valid uid, but no authorization, we will carry on but the business
    # logic below may set the PO aside for human intervention.
    logging.warning('Not checking for authorization!')
    if not 'data' in event or not event['data']:
        err({ 'error': 'MissingDataError', 'message': 'Event does have data' })
        return
    is_valid = purchase_order_request_schema_validator.is_valid(event['data'])
    if not is_valid:
        err({ 'error': 'SchemaMismatchError', 'message': 'Event data does not match event schema' })
        return
    # Do something useful with the event. What that is depends on business logic, and on the outcome of
    # authorization logic above.
    logging.info("Reached the business logic!")

Note that, while the boilerplate in this case is boilerplate in that it is the same pattern for all event handlers, it is not as straight-forward as for the dispatcher to parameterize this code and make it generic so for now, I won't bother. The comments in the code describe some of what needs to be done.

Dispatching, and error handling

To get to the event handler, the event has to be dispatched. Let's walk through a few examples of what that might look like.

To know how to dispatch an event, we need a dispatch function. That function will need to know two things:

  1. how to handle errors, which we will do by calling a call-back function whenever one occurs, and
  2. where to dispatch the events to: which function to call for a specific type of event.

The snippet of code below calls the module's get_event_dispatcher function, which returns the dispatch function, and gives it both of those pieces of information. Whenever the dispatch function is called with an event, it will do two things: it will validate the event against the super-schema and, if valid, it will call the appropriate event handler function. If something goes wrong, the error handler is called. The error handler is also passed to the event handler as its first argument.

Of course, you already knew all of this because you read the code above. The code below just puts all of that into action:

In [3]:
dispatch = get_event_dispatcher(
    err=lambda e : logging.error(f"Error {e['error']}: {e['message']}"),
    handlers={ 'PurchaseOrderRequested': on_purchase_order_requested_event }
    )

Let's give this a try: events received that don't have all the information needed are reported as errors and otherwise ignored.

In [4]:
from uuid import uuid4 as uuid

# this one will fail
dispatch({ 'id': str(uuid()), 'type': 'PurchaseOrderRequested', 'metadata': { 'cid': str(uuid()) } })

# this one will fail later
dispatch({
    'id': str(uuid()),
    'type': 'PurchaseOrderRequested',
    'metadata': { 'cid': str(uuid()), 'pid': str(uuid()), 'uid': str(uuid()) }
})
ERROR:root:Error SchemaMismatchError: Event does not match event schema
WARNING:root:Not checking for authorization!
ERROR:root:Error MissingDataError: Event does have data

An event that has all the necessary data reaches the event handler's business logic:

In [5]:
dispatch({
    'id': str(uuid()),
    'type': 'PurchaseOrderRequested',
    'metadata': { 'cid': str(uuid()), 'pid': str(uuid()), 'uid': str(uuid()) },
    'data': {
        'requestDate': '2024-05-29',
        'items': [
            {
                'name': 'Rose bouquet with 12 red roses',
                'code': 'ROSE12BQ1',
                'qty': 1,
                'price': 132.99,
                'discount': 0
            }
        ]
    }
})
WARNING:root:Not checking for authorization!
INFO:root:Reached the business logic!

This means we can create a client using any popular event bus and just call dispatch with the received events!

We'll show how to do this using a typical bus implementation shortly. Let's first look at what sending events would look like.

Sending events in Python

Ideally, all we ever want to do to send a message is

send(event_type, event_data)

but to get there, a few things need to happen. Namely, we need to at least identify the required parameters for the message, and we need to provide enough information to put it on the bus to allow the consumers to consume it in some meaningful way. So, the information we need is:

  • the topic to send the event on
  • the event's type
  • the CID (if this is part of a correlated set of events)
  • additional meta-data as needed (PID, TID, UID, token)
  • the event's payload, if any.

If we were to write a generic send function, it would look something like this:

In [6]:
def send_event(event_type, event_data=None, cid=None, pid=None, tid=None, uid=None, token=None):
    # do something meaningful here
    pass

The producer ID won't change from one call to another, so a slightly better version might look like this:

In [7]:
def get_send_event_function(pid=None):
    def send_event(event_type, event_data=None, cid=None, tid=None, uid=None, token=None):
        # do something meaningful here
        pass

This doesn't tell us how to format the event type and data into actual events we can put on a bus, so let's write a function to do that:

In [8]:
def _get_format_event_function(data_preprocessors=None):
    if not data_preprocessors:
        data_preprocessors = { '__default__': lambda a : a }
    def format_event(type, cid=None, id=None, pid=None, tid=None, uid=None, token=None, data=None):
        if data:
            if ((not type in data_preprocessors or not data_preprocessors[type])
                and (not '__default__' in data_preprocessors or not data_preprocessors['__default__'])):
                raise Exception(f'No data formatter for type {type}, and data was provided')
            elif not type in data_preprocessors or not data_preprocessors[type]:
                formatted_data = data_preprocessors['__default__'](data)
            else:
                formatted_data = data_preprocessors[type](data)
        else:
            formatted_data = None
        event_id = id if id else str(uuid())
        metadata = {}
        metadata['cid'] = cid if cid else event_id
        if uid:
            metadata['uid'] = uid
        if token:
            metadata['token'] = token
        if pid:
            metadata['pid'] = pid
        metadata['tid'] = tid if tid else event_id
        
        formatted_event = {
            'id': event_id,
            'type': type,
            'metadata': metadata
        }
        if formatted_data:
            formatted_event['data'] = formatted_data
        
        return formatted_event
    return format_event

You might wonder why we have a data pre-processor here. Sometimes, data isn't just presented as a dict we can serialize into JSON, so it may need some pre-processing. Examples of this are any time the data is encapsulated into a class of some sort.

Take our purchase order for example: it could be encapsulated in a class:

In [9]:
from datetime import datetime

class PurchaseOrder:
    def __init__(self):
        self._id = str(uuid())
        self._request_date = datetime.today().strftime('%Y-%m-%d')
        self._items = []

    def add_item(self, code, qty, name=None, price=None, discount=None):
        item = { 'code': code, 'qty': qty }
        if name:
            item['name'] = name
        if price:
            item['price'] = price
        if discount:
            item['discount'] = discount
        self._items.append(item)

    def serialize(self):
        serialized = {
            'id': self._id,
            'requestDate': self._request_date,
            'items': self._items
        }
        return serialized

in which case our data preprocessor could be:

In [10]:
data_preprocessors={
    'PurchaseRequested': lambda a : a.serialize()
}

This brings our generic get_send_event_function to:

In [11]:
def get_send_event_function(data_preprocessors=None, pid=None):
    format_event = _get_format_event_function(data_preprocessors)
    if not pid:
        pid = str(uuid())
    def send_event(event_type, event_data=None, cid=None, tid=None, uid=None, token=None):
        formatted_event = format_event(
            type=event_type,
            cid=cid,
            pid=pid,
            tid=tid,
            uid=uid,
            token=token,
            data=event_data
        )
        # send the formatted_event
    return send_event

This function still doesn't actually send anything. To have it send something to a bus (any bus), we need to give it a way to actually send.

In [12]:
def get_send_event_function(send, data_preprocessors=None, pid=None):
    if not data_preprocessors:
        data_preprocessors = { '__default__': lambda a : a }
    format_event = _get_format_event_function(data_preprocessors)
    if not pid:
        pid = str(uuid())
    def send_event(event_type, event_data=None, cid=None, uid=None, token=None):
        formatted_event = format_event(
            type=event_type,
            cid=cid,
            pid=pid,
            tid=tid,
            uid=uid,
            token=token,
            data=event_data
        )
        send(formatted_event)
    return send_event

Before we go much further, though, we need to decide how to organize messages on the bus. In this example, we will use topics as the name suggests: they group what the messages are about. That means that the purchase order requests and the approved purchase orders go on the same topic (purchase-orders), but are consumed by different micro-services. Each micro-service may in turn have more than one instance running in parallel. We'll take the "order" micro-service as an example, using Kafka.

consumer = KafkaConsumer('purchase-orders', group_id='services-order')
handlers = {'PurchaseRequested': on_purchase_order_event}
dispatch = get_event_dispatcher(
    lambda err : logging.error(f'Error {err["error"]}: {err["message"]}', exc_info=err),
    handlers
    )
for event in consumer:
    try:
        event_dict = json.loads(str(event.value, encoding='utf-8'))
        dispatch(event_dict)
    except:
        pass
Sending on Kafka

Kafka organizes the bus into "topics", but does not route between topics and queues by itself (topics are essentially queues). However, you can use an ETL to do that, essentially having it function as a data pump.

To send anything to a kafka bus, you need a KafkaProducer.

Let's say we want to order two magnolias and five red roses:

import json
from kafka import KafkaProducer
from uuid import uuid4 as uuid

def get_send_to_kafka_function(producer, topic):
    def send_to_kafka(event):
        producer.send(topic, json.dumps(event).encode())
    return send_to_kafka

producer = KafkaProducer()

send_event = get_send_event_function(
    get_send_to_kafka_function(
        producer,
        'purchase-orders'
    ), data_preprocessors={
        'PurchaseRequested': lambda a : a.serialize()
    }
)

po = PurchaseOrder()
po.add_item('MAGNOLIA', 2)
po.add_item('RED_ROSE', 5)

send_event('PurchaseRequested', po, uid=str(uuid()))

producer.flush()

Conclusion

Interoperability is a solvable problem: providing sufficient meta-data in a standard format, providing the data schema in a standard way, and providing the semantics for that data as importable modules goes a long way toward solving it. The super-schema and supporting code to handle the meta-data and map to the data schema takes it another step.