Skip to main content

Run OPAL-server with a Kafka backbone pub/sub

Introduction

What do we mean by backbone pub/sub or Broadcast-channel ?

OPAL-server can scale-out both in number of worker processes per server and in multiple servers. While OPAL provides a lightweight websocket pub/sub for OPAL-clients, the multiple servers are linked together by a more heavyweight messaging solution - e.g. Kafka, Redis, or Postgres Listen/Notify.

Broadcaster module

Support for multiple backbone is provided by the Python Broadcaster package. To use it with Kafka we need to install the broadcaster[kafka] module - with: pip install broadcaster[kafka]

Starting with OPAL 0.1.21, it is no longer needed to install the broadcaster[kafka] package - it already comes installed with OPAL.

Running with Kafka

When you run the OPAL-server you can choose which backend it should use with the OPAL_BROADCAST_URI option default is Postgres but running with Kafka is as simple as OPAL_BROADCAST_URI=kafka://kafka-host-name:9092 notice the "kafka://" prefix, that's how we tell OPAL-server to use Kafka.

Setting a Kafka topic (aka Backbone channel)

Be sure to configure the topic in your Kafka server that will act as a channel between all servers - the default name for it is EventNotifier. But (in version OPAL 0.1.21 and later) you can also use the OPAL_BROADCAST_CHANNEL_NAME option to specify the name of the channel.

  • Don't confuse the Kafka topic with the OPAL-server topics.
    • a Kafka topic is used to control which servers share clients and events.
    • OPAL topics control which clients receive which policy or data events.

Docker-compose Example

Check out docker/docker-compose-with-kafka-example.yml for running docker compose with OPAL-server, OPAL-client, Zookeeper, and Kafka.

Run this example docker config with this command:

docker compose -f docker/docker-compose-with-kafka-example.yml up --force-recreate

Give KafKa and OPAL a few seconds to start up and then run the event update (see triggering updates) to check for connectivity.

For example run an update with the OPAL cli:

opal-client publish-data-update --src-url https://api.country.is/23.54.6.78 -t policy_data --dst-path /users/bob/location

You should see the effect in:

  • OPAL-server - you should see "Broadcasting incoming event" in the logs
  • OPAL-client - should receive and act on the event
  • Kafka - should see the event and it's data in the topic something like:
    Partition: 0
    Offset: 3
    Headers:
    Value:
    {"notifier_id": "9a9a97df1da64486a1a56a070f1c3db3", "topics": ["policy_data"], "data": {"id": null, "entries": [{"url": "https://api.country.is/23.54.6.78", "config": {}, "topics": ["policy_data"], "dst_path": "/users/bob/location", "save_method": "PUT"}], "reason": "", "callback": {"callbacks": []}}}

The example docker compose also runs Kafka UI on http://localhost:8080 and you can see the message sent on the kafka topic EventNotifier.

Triggering events directly from Kafka

OPAL-server has a specific Schema for backbone events - BroadcastNotification by writing JSON objects in the schema to the shared Kafka topic we can trigger events directly from Kafka.

Schema

Structure

  • 'notifier_id': A random UUID identifying the source of the message (e.g. the OPAL-server sending it)- you can just make up one.

  • data the event content of type [DataUpdate](https://github.com/permitio/opal/blob/ 8e1e63d585999902b9882633369cba5dcfe7ad3f/opal_common/schemas/data.py#L80) - 'id': UUID for the event itself (random / can be null) - 'reason': A human readable reason for the event (optional) - 'entires': a list of DataSourceEntry - 'url': the url the clients should connect to in-order to get the data - 'config': the configuration for the data fetcher source (any object, optional) - 'topics': A list of OPAL topics to which the message is to be sent (for clients). - 'dst_path': The path in OPA to which the data should be saved. - 'save_method': The HTTP method to use when saving the data in OPA (PUT/ PATCH)

    - 'callback': an [UpdateCallback](https://github.com/permitio/opal/blob/8e1e63d585999902b9882633369cba5dcfe7ad3f/opal_common/schemas/data.py#L71) - Configuration for how to notify other services on the status of Update

example object

{"notifier_id": "9a9a97df1da64486a1a56a070f1c3db3", "topics": ["policy_data"], "data": {"id": null, "entries": [{"url": "https://api.country.is/23.54.6.78", "config": {}, "topics": ["policy_data"], "dst_path": "/users/bob/location", "save_method": "PUT"}], "reason": "User reconnected from new IP", "callback": {"callbacks": []}}}