By Paweł Markowski, IT System Architect and Development Director - CloudFerro

The effective calculations and analysis of Earth Observation data need a substantial allocation of computing resources. To achieve optimal efficiency, we can rely on a cloud infrastructure, such as the one provided by CloudFerro. This infrastructure allows users to spawn robust virtual machines or Kubernetes clusters. Additionally, leveraging this infrastructure in conjunction with tools such as the EO Data Catalog is of utmost significance in accelerating our computational operations. Efficient and well-considered enquiries that we use might help our script speed up the whole process.

Slow count operations on PostgreSQL DB

PostgreSQL database slow counting number of records is a well-known problem that has been described on various articles and official PostgreSQL web pages like: https://wiki.postgresql.org/wiki/Slow_Counting

We will develop an efficient application compatible with the Catalogue API(OData) and highly reliable in handling various error scenarios. These errors may include network-related issues, such as problems with the network connection, encountering limits on the API (e.g., HTTP 429 - Too Many Requests), timeouts, etc.

Workflow implementation

To enhance the IT perspective of this article, we will utilise Temporal.io workflows. Temporal is a programming model that enables the division of specific code logic into actions that can be automatically retried, enhancing productivity and efficiency in the workplace. An intriguing aspect will be the invocation of a Python activity (https://github.com/CloudFerro/temporal-workflow-tutorial/blob/main/activities/count_products_activity.py) from a Golang code (https://github.com/CloudFerro/temporal-workflow-tutorial/blob/main/sample-workflow/main.go.

Now, let us go into the code.

The image below illustrates the schematic representation of the workflow. We have modified the code in order to execute a simple query: count all SENTINEL-2 online products where the observation date is between 2023-03-01 and 2023-08-31

It is important to note that the ODATA API allows users to construct queries with various conditions or even nested structures. Our script assumes basic query structure like:

https://datahub.creodias.eu/odata/v1/Products?$filter=((ContentDate/Start ge 2023-03-01T00:00:00.000Z and ContentDate/Start lt 2023-08-31T23:59:59.999Z) and (Online eq true) and (((((Collection/Name eq ‘SENTINEL-2’))))))&$expand=Attributes&$expand=Assets&$count=True&$orderby=ContentDate/Start asc

Let’s dive into the technical intricacies of our workflow code, crafted in Go. This code is the backbone of our data management system, designed to streamline queries and drive efficient operations, you can find full source here: https://github.com/CloudFerro/temporal-workflow-tutorial/blob/main/query-workflow/query-workflow.go

At the core of this code lies a remarkably straightforward main function: c, err := client.Dial(client.Options{})

The line above establishes a connection between our application and the Temporal server. If the Temporal server is not accessible, an error message will be displayed.

2023/11/07 13:20:50 Failed creating client: failed reaching server: last connection error: connection error

For the comprehensive installation guide, refer to the documentation provided at https://docs.temporal.io/kb/all-the-ways-to-run-a-cluster.

To set your local Temporal server in motion, simply execute the following command within your terminal: temporal server start-dev
With this, your local Temporal server springs into action, awaiting workflow executions.

Once the local Temporal server is up and running, you can activate the worker by executing:

go run main.go
2023/11/07 13:28:30 INFO  No logger configured for temporal client.
Created default one.
2023/11/07 13:28:30 Starting worker (ctrl+c to exit)
2023/11/07 13:28:30 INFO  Started Worker Namespace default TaskQueue
catalogue-count-queue WorkerID 3114@XXX

Worker implements Workflow that simply divides date ranges into manageable five-day timeframes. This approach ensures that our queries to the EO Catalog will be not only swift but also effective. Armed with these segmented timeframes, we embark on fresh queries, optimizing the process of counting products and advancing toward our ultimate goal.

The optimized process of counting products will help us to get the correct number of specified products.

Run WorkflowWorker: cd sample-workflow/ && go mod tidy && go run main.go

Progress monitoring

In our workflow definition, we have not only segmented dates but also incorporated monitoring functionality, enabling us to query the current state / progress.

currentState := “started”
      queryType := “current_state”
      err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
            return currentState, nil
      })
      if err != nil {
            currentState = “failed to register query handler”
            return -1, err
      }

These queries can be invoked through an API or accessed via the Temporal server Web UI, which is available at the following address: https://localhost:8233/namespaces/default/workflows. You can use the Web UI to check the status, as demonstrated in the image below.

Activity implementation

Last but not least, part of our short demo is the Activity definition.

We can find the code in /activities/count_products_activity.py. That code executes received query with a static 40sec timeout. Before you start running that activity process, please remember about installing requirements.txt in your python env.

A short reminder how to do that:

python3 -m venv activities/.venv
source activities/.venv/bin/activate
pip install -r activities/requirements.txt

Start activity worker: python activities/count_products_activity.py
To execute an example workflow with a sample query, please execute:
python activities/run_workflow.py

CloudFerro’s GitHub repository can be found on: https://github.com/CloudFerro/temporal-workflow-tutorial/