How to extend OPAL with custom Fetch Providers
This tutorial will explain how to write and use your own custom fetch providers, so that OPAL can pull state from a custom service into the authorization layer (i.e: OPA).
The guide has 3 main parts:
- Background - explains why we need custom fetch providers, gives examples for use cases and explains what fetch providers are.
- Writing your own fetch provider - step-by-step explanation how to write your own fetch provider.
- Using a custom fetch provider - given a custom fetch provider (either published by someone else or written by you), shows how to use the provider in your own OPAL setup.
TL;DR
This tutorial is long and detailed, but the gist of it is:
- All Fetch Providers are simply python classes that derive from BaseFetchProvider.
- You need to implement the fetching logic in
_fetch_()
and optionally_process_()
. - Once you finish implementing your provider, you can publish it as a pip package. You can then tell OPAL to use it with the configuration env var
OPAL_FETCH_PROVIDER_MODULES
. - We created a well-documented example fetch provider for Postgres SQL. If you prefer to learn from a real code example you can simply clone it and play with it.
Background
One of the core features of OPAL (besides realtime syncing of authorization state) is the ability to aggregate state from multiple data sources into OPA.
Use cases for fetching authorization state from external sources
- We might want to allow certain actions only for paying users. In order to know if the user is a paying user, the authorization layer needs to fetch billing data from a 3rd party service (i.e: Stripe).
- We might want to allow a customer success rep to impersonate a user belonging to one of our customers for demo purposes. But only if the customer success rep is assigned a ticket in Salesforce.
- In our architecture we have a microservice that manages our custom RBAC roles, we want to pull the list roles and their permissions from the roles service into OPA.
What are OPAL fetch providers?
Fetch Providers are the (pluggable) components OPAL uses to fetch data from sources on demand. You can think about each provider as a plugin or a driver that can teach OPAL how to fetch data from a new data-source.
OPAL was designed to be extensible, and you can easily create more fetch providers to enable OPAL to fetch data from your own unique sources (e.g. a SaaS service, a new DB, your own proprietary solution, etc).
Writing your own fetch provider
In this section we will show a step-by-step tutorial how to write an OPAL fetch provider.
We already created a fully-functional fetch provider for Postgres SQL, that you may use if you need to fetch data from postgres. This fetcher is well documented and you can learn from it how to write your own fetch providers. We will also reference code examples from this fetch provider in our tutorial.
Step 1 - creating your project file hierarchy
All Fetch Providers are simply python classes that derive from BaseFetchProvider.
Fetch Providers are loaded into the fetcher-register from a list of python modules specified by the OPAL configuration env var OPAL_FETCH_PROVIDER_MODULES
.
In order for OPAL to be able to load your fetch-provider python module (by load we mean import inside python), the module must be installed on your machine. The best way to install python modules is to publish them as a pip package.
Your minimum file tree should look like this:
.
├── LICENSE
├── README.md
├── opal_fetcher_postgres
├── requirements.txt
├── setup.py
It's pretty basic but we'll go through it anyways:
LICENSE
- an open-source license. OPAL itselfs uses the Apache 2.0 license.README.md
- a readme that describes your package, typically includes instructions how to install and use your fetch provider (recommended).opal_fetcher_postgres
- will be probably have a different name in your own fetch-provider, this is the name of the package after it is installed by pip. (In other words, the import inside python will look like this:import opal_fetcher_postgres
).requirements.txt
- other pip packages required by your fetch-provider. At minimum, you will need the following packages:opal-common
andpydantic
.setup.py
- This file includes instructions how to install your fetch-provider package. You can copy from us.
Step 2 - your provider module (general structure)
Under your module folder, you should typically have two files:
.
├── __init__.py
└── provider.py
The provider.py
module should have the following structure:
- A class inheriting from
FetcherConfig
- your config class. - A class inheriting from
FetchEvent
- your event class. - A class inheriting from
BaseFetchProvider
- your fetch provider class.
This example code shows the class structure and some comments:
from pydantic import BaseModel, Field
from opal_common.fetcher.fetch_provider import BaseFetchProvider
from opal_common.fetcher.events import FetcherConfig, FetchEvent
class PostgresFetcherConfig(FetcherConfig):
"""
Config for PostgresFetchProvider, inherits from `FetcherConfig`.
* In your own class, you must set the value of the `fetcher` key to be your custom provider class name.
"""
fetcher: str = "PostgresFetchProvider"
class PostgresFetchEvent(FetchEvent):
"""
When writing a custom provider, you must create a custom FetchEvent subclass, just like this class.
In your own class, you must:
* set the value of the `fetcher` key to be your custom provider class name.
* set the type of the `config` key to be your custom config class (the one just above).
"""
fetcher: str = "PostgresFetchProvider"
config: PostgresFetcherConfig = None
class PostgresFetchProvider(BaseFetchProvider):
"""
The fetch-provider logic, must inherit from `BaseFetchProvider`.
"""
...
You may also reference the provider module from the postgres fetcher.
Step 3 - implementing your FetcherConfig and FetchEvent
Each fetch-provider might require specific values that will be passed to it as part of its configuration. The configuration is simply a Pydantic model that must derive from the FetcherConfig
class.
Let's analyze the real code of the PostgresFetcherConfig class from the postgres fetcher.
class PostgresConnectionParams(BaseModel):
"""
if one does not want to pass all postgres arguments in the dsn (in OPAL - the url is the dsn),
one can also use this dict to pass specific arguments.
"""
database: Optional[str] = Field(None, description="the database name")
user: Optional[str] = Field(None, description="user name used to authenticate")
password: Optional[str] = Field(None, description="password used to authenticate")
host: Optional[str] = Field(None, description="database host address (defaults to UNIX socket if not provided)")
port: Optional[str] = Field(None, description="connection port number (defaults to 5432 if not provided)")
class PostgresFetcherConfig(FetcherConfig):
"""
Config for PostgresFetchProvider, instance of `FetcherConfig`.
When an OPAL client receives an update, it contains a list of `DataSourceEntry` objects.
Each `DataSourceEntry` has a `config` key - which is usually an instance of a subclass of `FetcherConfig`.
When writing a custom provider, you must:
- derive your class (inherit) from FetcherConfig
- override the `fetcher` key with your fetcher class name
- (optional): add any fields relevant to a data entry of your fetcher.
- In this example: since we pull data from PostgreSQL - we added a `query` key to hold the SQL query.
"""
fetcher: str = "PostgresFetchProvider"
connection_params: Optional[PostgresConnectionParams] = Field(None, description="these params can override or complement parts of the dsn (connection string)")
query: str = Field(..., description="the query to run against postgres in order to fetch the data")
fetch_one: bool = Field(False, description="whether we fetch only one row from the results of the SELECT query")
- The
PostgresConnectionParams
class is simply a sub-model of the main pydantic model. you might not need such a structure in your own implementation. - The
PostgresFetcherConfig
is our actual fetcher config class:- The
fetcher
attribute is mandatory. It must include the name of your fetch-provider class. This is the value that must be later included in your DataSourceEntry objects in order to indicate which fetcher must be used. You can forget about it now, we will explain more when we get to the Using a custom fetch provider section. - The other attributes are specific to your fetcher. For example, in the postgres fetcher, the
query
attribute contains the SQL SELECT query that the fetcher should run against postgres to fetch the data.
- The
Your FetchEvent
derived class is more straightforward, simply:
- Rename the event class to whatever you want.
- Set the value of the
fetcher
key to be your custom provider class name. - Set the type of the
config
key to be your custom config class.
class PostgresFetchEvent(FetchEvent):
fetcher: str = "PostgresFetchProvider"
config: PostgresFetcherConfig = None
Step 4 - implementing your FetchProvider class
Your fetch provider class implements the actual logic that is needed to fetch a DataSourceEntry
object.
The structure of the provider class is as follows:
class PostgresFetchProvider(BaseFetchProvider):
"""
The fetch-provider logic, must inherit from `BaseFetchProvider`.
"""
...
def __init__(self, event: PostgresFetchEvent) -> None:
"""
inits your provider class
"""
def parse_event(self, event: FetchEvent) -> PostgresFetchEvent:
"""
deserializes the fetch event type from the general `FetchEvent` to your derived fetch event (i.e: `PostgresFetchEvent`)
"""
# if you require context to cleanup or guard resources, you can use __aenter__() and __aexit__()
async def __aenter__(self): ...
async def __aexit__(self, exc_type=None, exc_val=None, tb=None): ...
async def _fetch_(self):
"""
the actual logic that you need to implement to fetch the `DataSourceEntry`.
Can reference your (derived) `FetcherConfig` object to access your fetcher attributes.
"""
async def _process_(self, data):
"""
optional processing of the data returned by _fetch_().
must return a jsonable python object (i.e: an object that can be dumped to json,
e.g: a list or a dict that contains only serializable objects).
"""
Let's reimplement the postgres provider step-by-step.
The constructor
The constructor must be initializd with the specific FetchEvent type you defined in the previous step, and should be propagated with super()
. You may also initialize class members.
def __init__(self, event: PostgresFetchEvent) -> None:
...
super().__init__(event)
self._connection: Optional[asyncpg.Connection] = None
self._transaction: Optional[Transaction] = None
The super()
method store the event in self._event
, and your fetcher configuration can be accessed in self._event.config
.
The parse_event() method
Simply replace the custom FetchEvent type (in this example PostgresFetchEvent
) with your own custom type. This method simply deserializes the event object from the generic FetchEvent
type into the more specific custom event type (i.e: PostgresFetchEvent
).
def parse_event(self, event: FetchEvent) -> PostgresFetchEvent:
return PostgresFetchEvent(**event.dict(exclude={"config"}), config=event.config)
Manage a context with __aenter__
and __aexit__
Your fetch provider should typically access the network or disk and be I/O-bound, therefore it is best to use asyncio
and typical best practices for writing async python code. That includes:
- Preferring
asyncio
-ready libraries instead of blocking libraries to fetch the data.- For example in our postgres provider, we use
asyncpg
instead of the blockingpsycopg2
.
- For example in our postgres provider, we use
- Using
__aenter__
and__aexit__
if you need to cleanup resources or guard a context.- See more info on async context managers here.
__aenter__
should be typically be used to initialize a connection, create a transcation, etc.
In our postgres example you can see that we connect to the database and start a transaction inside __aenter__
. Notice that the transaction itself is an async context manager so we await its own __aenter__
:
async def __aenter__(self):
# initializing parameters from the event/config
self._event: PostgresFetchEvent
dsn: str = self._event.url
connection_params: dict = {} if self._event.config.connection_params is None else self._event.config.connection_params.dict(exclude_none=True)
# connect to the postgres database
self._connection: asyncpg.Connection = await asyncpg.connect(dsn, **connection_params)
# start a readonly transaction (we don't want OPAL client writing data due to security!)
self._transaction: Transaction = self._connection.transaction(readonly=True)
await self._transaction.__aenter__()
return self
Similarly __aexit__
should be typically be used to free resources that were allocated inside __aenter__
.
In our postgres example:
async def __aexit__(self, exc_type=None, exc_val=None, tb=None):
# End the transaction
if self._transaction is not None:
await self._transaction.__aexit__(exc_type, exc_val, tb)
# Close the connection
if self._connection is not None:
await self._connection.close()
Implementing _fetch_
and _process_
Providers implement a _fetch_()
method to access and fetch data from the data-source. They also optionally implement a _process_()
method to mutate the data before returning it (for example converting a JSON string to an actual object).
The _fetch_()
and _process_()
method can access the fields available from self._event
(the FetchEvent):
- The url we should fetch data from is available at
self._event.url
. - The custom
FetcherConfig
(custom configuration) is available atself._event.config
.
In our own example provider _fetch_()
simply runs the SQL query and returns the results:
async def _fetch_(self):
self._event: PostgresFetchEvent # type casting
# ...
# there was more code here, it's not very interesting for the tutorial ;)
# ...
if self._event.config.fetch_one:
row = await self._connection.fetchrow(self._event.config.query)
return [row]
else:
return await self._connection.fetch(self._event.config.query)
Since asyncpg returns a list of asyncpg.Record
objects, we must process them in _process_
and turn them into something jsonable (the reason is that we currently only support OPA as a policy store, and OPA can only store JSON).
Our _process_()
method takes care of the conversion:
async def _process_(self, records: List[asyncpg.Record]):
self._event: PostgresFetchEvent # type casting
# when fetch_one is true, we want to return a dict (and not a list)
if self._event.config.fetch_one:
if records:
# we transform the asyncpg record to a dict that we can be later serialized to json
return dict(records[0])
else:
return {}
else:
# we transform the asyncpg records to a list-of-dicts that we can be later serialized to json
return [dict(record) for record in records]
Bonus: How the process of calling your fetch provider works:
- The fetch provider is called by the FetchingEngine's
fetch_worker
. - The fetch_worker invokes a provider's
.fetch()
and.process()
methods which are simply proxies to its_fetch_()
and_process_()
methods. - The fetcher-register loads the providers when OPAL client first loads and makes them available for fetch-workers.
Using a custom fetch provider
This section explains how to use a custom OPAL fetch provider in your OPAL setup.
Before we begin - How does OPAL find custom fetch providers?
As mentioned before, all FetchProviders are simply python classes that derive (inherit) from BaseFetchProvider. OPAL searches for fetch providers based on the env var OPAL_FETCH_PROVIDER_MODULES
, defined here.
For example, if the env var is:
OPAL_FETCH_PROVIDER_MODULES=opal_common.fetcher.providers,opal_fetcher_postgres.provider
OPAL will parse this var as a comma-separated list, and for each item in the list OPAL will find that python module, import it and then look inside the imported module for subclasses of BaseFetchProvider
.
In our example, OPAL will import two python modules:
opal_common.fetcher.providers
: there's a trick in the __init__.py file of the module that causes all classes in the directory to be added to__all__
and thus to be available directly under the module. Since bothHttpFetchProvider
andFastApiRpcFetchProvider
inherit fromBaseFetchProvider
- both of them will be found by OPAL and added to the fetcher register.opal_fetcher_postgres.provider
: no special tricks here. if you look inside that module, you will see that the classPostgresFetchProvider
inherits fromBaseFetchProvider
.
1) Create a custom docker image containing your fetch provider
In the official docker images of OPAL, no custom providers are installed. In order for OPAL to be able to load a custom provider's python module, the python module need to be available on the docker image.
Therefore the first step is to create and build a custom OPAL-client Dockerfile
.
Example Dockerfile
(taken from the example fetcher repo) - of a non-published python package:
# inherits all behavior defined in the official OPAL-client image
FROM permitio/opal-client:latest
WORKDIR /app/
# These two commands installs the python package from source
COPY . ./
RUN python setup.py install
If your custom provider is published to PyPI (assuming its name is opal-fetcher-postgres
), the docker image can be even simpler:
# inherits all behavior defined in the official OPAL-client image
FROM permitio/opal-client:latest
# installs the python package inside the container (from pip / PyPI)
RUN pip install --user opal-fetcher-postgres
2) Build your custom opal-client container
Say your special Dockerfile from step one is called custom_client.Dockerfile
.
You must build a customized OPAL container from this Dockerfile, like so:
docker build -t yourcompany/opal-client -f custom_client.Dockerfile .
3) When running OPAL, set OPAL_FETCH_PROVIDER_MODULES
Pass a customized OPAL_FETCH_PROVIDER_MODULES
env var to the OPAL client docker container (comma-separated provider modules):
OPAL_FETCH_PROVIDER_MODULES=opal_common.fetcher.providers,opal_fetcher_postgres.provider
Notice that OPAL receives a list from where to search for fetch providers.
The list in our case includes the built-in providers (opal_common.fetcher.providers
) as well as our custom postgres provider. Naturally, replace opal_fetcher_postgres.provider
with your own custom provider if needed.
4) Using the custom provider in your DataSourceEntry objects
Fetchers are triggered when OPAL client is instructed to fetch Data Source Entries. Each entry is a directive what data to fetch, from where, how it should be fetched and how it should be saved into the policy store (i.e: OPA).
Your DataSourceEntry objects can be used either in OPAL_DATA_CONFIG_SOURCES
as initial data sources fetched when OPAL client first loads, or in dynamic (realtime) updates sent via the OPAL publish API. There's a guide on hwo to trigger data updates here.
Each DataSourceEntry object has a config
attribute which is a dict that matches the schema of FetcherConfig
.
The dict included in config
should contain:
- A
fetcher
attribute that indicates to OPAL that a custom provider should be used to fetch thatDataSourceEntry
. Thefetcher
attribute should contain the name of the custom FetchProvider class (the class that derives fromBaseFetchProvider
). - Any custom attributes that your fetcher config type declares (the python class defined in your fetch-provider module that inherits from
FetcherConfig
). See how to write a custom config in the writing providers section above.
Example value of OPAL_DATA_CONFIG_SOURCES
(formatted nicely, but in env var you should pack this to one-line and no-spaces):
{
"config": {
"entries": [
{
"url": "postgresql://postgres@example_db:5432/postgres",
"config": {
"fetcher": "PostgresFetchProvider",
"query": "SELECT * from city;",
"connection_params": {
"password": "postgres"
}
},
"topics": ["policy_data"],
"dst_path": "cities"
}
]
}
}
In the example OPAL_DATA_CONFIG_SOURCES
we just shown:
- The
fetcher
attributes indicates that in order to fetch the entry, the providerPostgresFetchProvider
must be used. - The
query
andconnection_params
attributes are specific to thePostgresFetchProvider
provider, and are defined by the config typePostgresFetcherConfig
.
Wrapping this up - check out a docker compose example
This docker compose file contains:
- A custom opal client based on this Dockerfile. Only difference is that we install the custom fetch-provider python module into the container.
- The configuration necessary to use the custom fetch provider:
OPAL_FETCH_PROVIDER_MODULES
is defined for the OPAL-client and tells OPAL to loadopal_fetcher_postgres.provider
to the fetcher register.OPAL_DATA_CONFIG_SOURCES
is defined for the OPAL-server with a DataSourceEntry that contains afetcher
override. The value of thefetcher
key tell OPAL to usePostgresFetchProvider
to fetch the entry.
you may run this compose file by cloning the example repo and running
docker compose up