PolarSPARC

Distributed Task Processing using Celery


Bhaskar S 09/18/2020


Overview

Recently, two new crypto currencies have entered the market and are all the rage - the BLU coin and the RED coin. There are only three exchanges where these popular coins are being traded - the CAT, DOG, and FOX exchanges respectively. There is an API exposed, through which buyers can query the current best favorable rate provided by one of the 3 exchanges. Given the popularity of these coins, the API is being bombarded by the consumers. How can one handle this situation ? What if there is an easy way to distribute the processing, in an asynchronous way, on a single host (or multiple hosts) using message passing, and with an ability to retry automatically on failures ???

Welcome Celery - the distributed task processing framework for Python !!!

Celery is a simple, flexible, and reliable distributed task queue processing framework for Python, with the following features:

The following illustration depicts the high-level architecture of the Celery task queue processing:

Architecture
Figure.1

Before we proceed further the following are some of the terminology used in the context of Celery:

Term Description
Task A job that needs to be dispatched for execution
Broker The messaging middleware such as RabbitMQ
Worker The entity that executes task(s)
Backend The store where the worker(s) persist the result of an execution

Now, we can explain the high-level flow using the architecture diagram from Figure.1 above. When client(s) (also referred to as Producer(s)) invoke a Python method (annotated with a special Celery task decorator), the decorated task sends a message (with the name of the method along with its arguments) to a designated task queue in the messaging Broker and returns an asynchronous result object to the caller. There are task Worker(s) waiting on the designated task queue (also referred to as Consumer(s)). When a message arrives on a queue, a task Worker executes the specified Python method with the method arguments (from the message) and sends the result (of the task execution) to the configured Backend store. The caller can check if the task execution has completed using the asynchronous result object. Once the task execution completes, the caller can get the result from the asynchronous result object, which in-turn fetches the result from the Backend store.

Installation

We can install, setup, and demonstrate Celery on a single host with VMs or on multiple host(s). For my setup, will leverage a 6-node cluster consisting of 5 ODroid XU4's and a ODroid C2. The following illustration shows the 6-node cluster:

Cluster
Figure.2

Ensure each of the nodes have the Armbian OS installed. Next, launch 6 terminal windows and login to each of the nodes (assuming the user-id is alice). We will refer to these terminals as my-xu4-1 thru my-xu4-5 and my-c2-1 respectively.

We need to find the machine architecture for both the ODroid XU4 and the ODroid C2 SBCs. In the terminal my-xu4-1, execute the following command:

$ dpkg --print-architecture

The following would be the typical output:

Output.1

armhf

Similarly, in the terminal my-c2-1, execute the above command.

The following would be the typical output:

Output.2

arm64

In each of the 6 terminals, execute the following command:

$ sudo apt-get remove docker docker-engine docker.io containerd runc

The following would be the typical output:

Output.3

Reading package lists... Done
Building dependency tree       
Reading state information... Done
E: Unable to locate package docker-engine

In each of the 6 terminals, execute the following command:

$ sudo apt-get update

The following would be the typical output:

Output.4

Hit:2 http://ports.ubuntu.com focal InRelease
Hit:3 http://ports.ubuntu.com focal-security InRelease
Hit:4 http://ports.ubuntu.com focal-updates InRelease
Hit:5 http://ports.ubuntu.com focal-backports InRelease
Hit:1 https://armbian.systemonachip.net/apt focal InRelease
Reading package lists... Done

In each of the 6 terminals, execute the following command:

$ sudo apt-get install apt-transport-https ca-certificates curl gnupg-agent software-properties-common

The following would be the typical output:

Output.5

Reading package lists... Done
Building dependency tree       
Reading state information... Done
ca-certificates is already the newest version (20190110ubuntu1.1).
curl is already the newest version (7.68.0-1ubuntu2.2).
software-properties-common is already the newest version (0.98.9.2).
apt-transport-https is already the newest version (2.0.2ubuntu0.1).
The following NEW packages will be installed:
  gnupg-agent
0 upgraded, 1 newly installed, 0 to remove and 0 not upgraded.
Need to get 5,236 B of archives.
After this operation, 46.1 kB of additional disk space will be used.
Do you want to continue? [Y/n] Y
Get:1 http://ports.ubuntu.com focal/universe arm64 gnupg-agent all 2.2.19-3ubuntu2 [5,236 B]
Fetched 5,236 B in 0s (18.1 kB/s)       
Selecting previously unselected package gnupg-agent.
(Reading database ... 84665 files and directories currently installed.)
Preparing to unpack .../gnupg-agent_2.2.19-3ubuntu2_all.deb ...
Unpacking gnupg-agent (2.2.19-3ubuntu2) ...
Setting up gnupg-agent (2.2.19-3ubuntu2) ...

In each of the 6 terminals, execute the following command:

$ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -

The following would be the typical output:

Output.6

OK

In each of the 5 terminals my-xu4-1 thru my-xu4-5, execute the following command:

$ sudo add-apt-repository "deb [arch=armhf] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"

The following would be the typical output:

Output.7

Get:1 https://download.docker.com/linux/ubuntu focal InRelease [36.2 kB]
Hit:3 http://ports.ubuntu.com focal InRelease                              
Hit:4 http://ports.ubuntu.com focal-security InRelease     
Hit:5 http://ports.ubuntu.com focal-updates InRelease
Hit:6 http://ports.ubuntu.com focal-backports InRelease
Get:7 https://download.docker.com/linux/ubuntu focal/stable armhf Packages [2,743 B]
Hit:2 https://mirrors.netix.net/armbian/apt focal InRelease
Fetched 38.9 kB in 2s (17.8 kB/s)
Reading package lists... Done

In the terminal my-c2-1, execute the following command:

$ sudo add-apt-repository "deb [arch=arm64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"

The following would be the typical output:

Output.8

Get:1 https://download.docker.com/linux/ubuntu focal InRelease [36.2 kB]
Hit:3 http://ports.ubuntu.com focal InRelease                              
Hit:4 http://ports.ubuntu.com focal-security InRelease     
Hit:5 http://ports.ubuntu.com focal-updates InRelease
Hit:6 http://ports.ubuntu.com focal-backports InRelease
Get:7 https://download.docker.com/linux/ubuntu focal/stable arm64 Packages [2,743 B]
Hit:2 https://mirrors.netix.net/armbian/apt focal InRelease
Fetched 38.9 kB in 2s (17.8 kB/s)
Reading package lists... Done

In each of the 6 terminals, execute the following command:

$ sudo apt-get update

The following would be the typical output:

Output.9

Hit:1 https://download.docker.com/linux/ubuntu focal InRelease
Hit:3 http://ports.ubuntu.com focal InRelease 
Hit:4 http://ports.ubuntu.com focal-security InRelease
Hit:5 http://ports.ubuntu.com focal-updates InRelease
Hit:6 http://ports.ubuntu.com focal-backports InRelease
Hit:2 https://imola.armbian.com/apt focal InRelease
Reading package lists... Done

In each of the 5 terminals my-xu4-1 thru my-xu4-5, execute the following command:

$ sudo apt-get install docker-ce docker-ce-cli containerd.io

The following would be the typical output:

Output.10

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Recommended packages:
  cgroupfs-mount | cgroup-lite pigz apparmor
The following NEW packages will be installed:
  containerd.io docker-ce docker-ce-cli
0 upgraded, 3 newly installed, 0 to remove and 0 not upgraded.
Need to get 60.2 MB of archives.
After this operation, 303 MB of additional disk space will be used.
Get:1 https://download.docker.com/linux/ubuntu focal/stable armhf containerd.io armhf 1.2.13-2 [16.2 MB]
Get:2 https://download.docker.com/linux/ubuntu focal/stable armhf docker-ce-cli armhf 5:19.03.12~3-0~ubuntu-focal [28.8 MB]
Get:3 https://download.docker.com/linux/ubuntu focal/stable armhf docker-ce armhf 5:19.03.12~3-0~ubuntu-focal [15.2 MB]            
Fetched 60.2 MB in 8s (7,619 kB/s)                                                                                                 
Selecting previously unselected package containerd.io.
(Reading database ... 84669 files and directories currently installed.)
Preparing to unpack .../containerd.io_1.2.13-2_armhf.deb ...
Unpacking containerd.io (1.2.13-2) ...
Selecting previously unselected package docker-ce-cli.
Preparing to unpack .../docker-ce-cli_5%3a19.03.12~3-0~ubuntu-focal_armhf.deb ...
Unpacking docker-ce-cli (5:19.03.12~3-0~ubuntu-focal) ...
Selecting previously unselected package docker-ce.
Preparing to unpack .../docker-ce_5%3a19.03.12~3-0~ubuntu-focal_armhf.deb ...
Unpacking docker-ce (5:19.03.12~3-0~ubuntu-focal) ...
Setting up containerd.io (1.2.13-2) ...
Created symlink /etc/systemd/system/multi-user.target.wants/containerd.service → /lib/systemd/system/containerd.service.
Setting up docker-ce-cli (5:19.03.12~3-0~ubuntu-focal) ...
Setting up docker-ce (5:19.03.12~3-0~ubuntu-focal) ...
Created symlink /etc/systemd/system/multi-user.target.wants/docker.service → /lib/systemd/system/docker.service.
Created symlink /etc/systemd/system/sockets.target.wants/docker.socket → /lib/systemd/system/docker.socket.
Processing triggers for man-db (2.9.1-1) ...
Processing triggers for systemd (245.4-4ubuntu3.2) ...

In the terminal my-c2-1, execute the following command:

$ sudo apt-get install docker-ce docker-ce-cli containerd.io

The following would be the typical output:

Output.11

Reading package lists... Done
Building dependency tree       
Reading state information... Done
Recommended packages:
  cgroupfs-mount | cgroup-lite pigz apparmor
The following NEW packages will be installed:
  containerd.io docker-ce docker-ce-cli
0 upgraded, 3 newly installed, 0 to remove and 0 not upgraded.
Need to get 60.2 MB of archives.
After this operation, 303 MB of additional disk space will be used.
Get:1 https://download.docker.com/linux/ubuntu focal/stable arm64 containerd.io arm64 1.2.13-2 [16.2 MB]
Get:2 https://download.docker.com/linux/ubuntu focal/stable arm64 docker-ce-cli arm64 5:19.03.12~3-0~ubuntu-focal [28.8 MB]
Get:3 https://download.docker.com/linux/ubuntu focal/stable arm64 docker-ce arm64 5:19.03.12~3-0~ubuntu-focal [15.2 MB]            
Fetched 60.2 MB in 8s (7,619 kB/s)                                                                                                 
Selecting previously unselected package containerd.io.
(Reading database ... 84669 files and directories currently installed.)
Preparing to unpack .../containerd.io_1.2.13-2_arm64.deb ...
Unpacking containerd.io (1.2.13-2) ...
Selecting previously unselected package docker-ce-cli.
Preparing to unpack .../docker-ce-cli_5%3a19.03.12~3-0~ubuntu-focal_arm64.deb ...
Unpacking docker-ce-cli (5:19.03.12~3-0~ubuntu-focal) ...
Selecting previously unselected package docker-ce.
Preparing to unpack .../docker-ce_5%3a19.03.12~3-0~ubuntu-focal_arm64.deb ...
Unpacking docker-ce (5:19.03.12~3-0~ubuntu-focal) ...
Setting up containerd.io (1.2.13-2) ...
Created symlink /etc/systemd/system/multi-user.target.wants/containerd.service → /lib/systemd/system/containerd.service.
Setting up docker-ce-cli (5:19.03.12~3-0~ubuntu-focal) ...
Setting up docker-ce (5:19.03.12~3-0~ubuntu-focal) ...
Created symlink /etc/systemd/system/multi-user.target.wants/docker.service → /lib/systemd/system/docker.service.
Created symlink /etc/systemd/system/sockets.target.wants/docker.socket → /lib/systemd/system/docker.socket.
Processing triggers for man-db (2.9.1-1) ...
Processing triggers for systemd (245.4-4ubuntu3.2) ...

In each of the 6 terminals, execute the following commands:

$ sudo usermod -aG docker $USER

$ sudo shutdown -r now

Since the nodes were rebooted, we need to login to each of the nodes one more time.

For our Celery Broker, we will use RabbitMQ as our messaging Broker.

On Docker Hub, check for the current stable version for RabbitMQ. At the time of this article, the current stable version was rabbitmq:3.8.8.

In the terminal my-xu4-3, execute the following command:

$ docker pull rabbitmq:3.8.8-management

The following would be the typical output:

Output.12

3.8.8-management: Pulling from library/rabbitmq
854ab59e811f: Pull complete 
996b7ca18b13: Pull complete 
50a08dcf8afc: Pull complete 
d34a2e7cb38e: Pull complete 
593701df30b6: Pull complete 
030c2f3628ad: Pull complete 
ad381249c522: Pull complete 
577e81d7995c: Pull complete 
e54080d86cca: Pull complete 
7733ca7770b9: Pull complete 
d08f278275c3: Pull complete 
04f09e774af8: Pull complete 
596b7f2fc950: Pull complete 
93da642e6c0b: Pull complete 
Digest: sha256:e18eb565a40a6c03156d76ee2597115e19d5cf632a0aa2cdf87724d94e86cfea
Status: Downloaded newer image for rabbitmq:3.8.8-management
docker.io/library/rabbitmq:3.8.8-management

For our Celery Backend, we will use Redis as our Backend store.

On Docker Hub, check for the current stable version for Redis. At the time of this article, the current stable version was redis:6.0.8.

In the terminal my-c2-1, execute the following command:

$ docker pull redis:6.0.8

The following would be the typical output:

Output.13

6.0.8: Pulling from library/redis
a6d76de28f58: Pull complete 
3573263a91cd: Pull complete 
74acfbcef883: Pull complete 
720e1be7fe14: Pull complete 
bcb81e952db9: Pull complete 
fa95093de04d: Pull complete 
Digest: sha256:1cfb205a988a9dae5f025c57b92e9643ec0e7ccff6e66bc639d8a5f95bba928c
Status: Downloaded newer image for redis:6.0.8
docker.io/library/redis:6.0.8

In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following command:

$ sudo apt install virtualenv -y

The following would be the typical output:

Output.14

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  python3-virtualenv
The following NEW packages will be installed:
  python3-virtualenv virtualenv
0 upgraded, 2 newly installed, 0 to remove and 0 not upgraded.
Need to get 47.8 kB of archives.
After this operation, 171 kB of additional disk space will be used.
Get:1 http://ports.ubuntu.com focal/universe armhf python3-virtualenv all 15.1.0+ds-1.1 [43.4 kB]
Get:2 http://ports.ubuntu.com focal/universe armhf virtualenv all 15.1.0+ds-1.1 [4,476 B]
Fetched 47.8 kB in 0s (107 kB/s)
Selecting previously unselected package python3-virtualenv.
(Reading database ... 80113 files and directories currently installed.)
Preparing to unpack .../python3-virtualenv_15.1.0+ds-1.1_all.deb ...
Unpacking python3-virtualenv (15.1.0+ds-1.1) ...
Selecting previously unselected package virtualenv.
Preparing to unpack .../virtualenv_15.1.0+ds-1.1_all.deb ...
Unpacking virtualenv (15.1.0+ds-1.1) ...
Setting up python3-virtualenv (15.1.0+ds-1.1) ...
Setting up virtualenv (15.1.0+ds-1.1) ...
Processing triggers for man-db (2.8.3-2ubuntu0.1) ...

In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following command:

$ virtualenv -p /usr/bin/python3 celery-venv

The following would be the typical output:

Output.15

Already using interpreter /usr/bin/python3
Using base prefix '/usr'
New python executable in /home/alice/celery-venv/bin/python3
Also creating executable in /home/alice/celery-venv/bin/python
Installing setuptools, pkg_resources, pip, wheel...done.

In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following command:

$ source celery-venv/bin/activate

The following would be the typical output:

Output.16

(celery-venv) $

In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following command:

(celery-venv) $ pip3 install celery[redis]

The following would be the typical output:

Output.17

Collecting celery[redis]
Downloading https://files.pythonhosted.org/packages/c8/0c/609e3611d20c9f8d883852d1be5516671f630fb08c8c1e56911567dfba7b/celery-4.4.7-py2.py3-none-any.whl (427kB)
  100% |--------------------------------| 430kB 1.1MB/s 
Collecting kombu<4.7,>=4.6.10 (from celery[redis])
Downloading https://files.pythonhosted.org/packages/9e/34/3eea6a3a9ff81b0c7ddbdceb22a1ffc1b5907d863f27ca19a68777d2211d/kombu-4.6.11-py2.py3-none-any.whl (184kB)
  100% |--------------------------------| 194kB 2.5MB/s 
Collecting vine==1.3.0 (from celery[redis])
Downloading https://files.pythonhosted.org/packages/7f/60/82c03047396126c8331ceb64da1dc52d4f1317209f32e8fe286d0c07365a/vine-1.3.0-py2.py3-none-any.whl
Collecting billiard<4.0,>=3.6.3.0 (from celery[redis])
Downloading https://files.pythonhosted.org/packages/e8/5e/7591866ff45b370354bd20291cb6f87ddb2eef1f1c88c890a38412037e11/billiard-3.6.3.0-py3-none-any.whl (89kB)
  100% |--------------------------------| 92kB 4.2MB/s 
Collecting pytz>dev (from celery[redis])
Downloading https://files.pythonhosted.org/packages/4f/a4/879454d49688e2fad93e59d7d4efda580b783c745fd2ec2a3adf87b0808d/pytz-2020.1-py2.py3-none-any.whl (510kB)
  100% |--------------------------------| 512kB 1.0MB/s 
Collecting redis>=3.2.0; extra == "redis" (from celery[redis])
Downloading https://files.pythonhosted.org/packages/a7/7c/24fb0511df653cf1a5d938d8f5d19802a88cef255706fdda242ff97e91b7/redis-3.5.3-py2.py3-none-any.whl (72kB)
  100% |--------------------------------| 81kB 4.4MB/s 
Collecting importlib-metadata>=0.18; python_version < "3.8" (from kombu<4.7,>=4.6.10->celery[redis])
Downloading https://files.pythonhosted.org/packages/8e/58/cdea07eb51fc2b906db0968a94700866fc46249bdc75cac23f9d13168929/importlib_metadata-1.7.0-py2.py3-none-any.whl
Collecting amqp<2.7,>=2.6.0 (from kombu<4.7,>=4.6.10->celery[redis])
Downloading https://files.pythonhosted.org/packages/bc/90/bb5ce93521772f083cb2d7a413bb82eda5afc62b4192adb7ea4c7b4858b9/amqp-2.6.1-py2.py3-none-any.whl (48kB)
  100% |--------------------------------| 51kB 4.2MB/s 
Collecting zipp>=0.5 (from importlib-metadata>=0.18; python_version < "3.8"->kombu<4.7,>=4.6.10->celery[redis])
Downloading https://files.pythonhosted.org/packages/b2/34/bfcb43cc0ba81f527bc4f40ef41ba2ff4080e047acb0586b56b3d017ace4/zipp-3.1.0-py3-none-any.whl
Installing collected packages: billiard, pytz, vine, zipp, importlib-metadata, amqp, kombu, redis, celery
Successfully installed amqp-2.6.1 billiard-3.6.3.0 celery-4.4.7 importlib-metadata-1.7.0 kombu-4.6.11 pytz-2020.1 redis-3.5.3 vine-1.3.0 zipp-3.1.0

In the terminal my-xu4-2, execute the following command:

(celery-venv) $ pip3 install flower

The following would be the typical output:

Output.18

Collecting flower
Downloading flower-0.9.5-py2.py3-none-any.whl (459 kB)
    |--------------------------------| 459 kB 1.9 MB/s 
Requirement already satisfied: celery>=3.1.0; python_version < "3.7" in ./celery-venv/lib/python3.6/site-packages (from flower) (4.4.7)
Collecting tornado<7.0.0,>=5.0.0; python_version >= "3.5.2"
Downloading tornado-6.0.4.tar.gz (496 kB)
    |--------------------------------| 496 kB 16.7 MB/s 
Collecting prometheus-client==0.8.0
Downloading prometheus_client-0.8.0-py2.py3-none-any.whl (53 kB)
    |--------------------------------| 53 kB 386 kB/s 
Requirement already satisfied: pytz in ./celery-venv/lib/python3.6/site-packages (from flower) (2020.1)
Collecting humanize
Downloading humanize-2.6.0-py3-none-any.whl (68 kB)
    |--------------------------------| 68 kB 1.5 MB/s 
Requirement already satisfied: vine==1.3.0 in ./celery-venv/lib/python3.6/site-packages (from celery>=3.1.0; python_version < "3.7"->flower) (1.3.0)
Requirement already satisfied: kombu<4.7,>=4.6.10 in ./celery-venv/lib/python3.6/site-packages (from celery>=3.1.0; python_version < "3.7"->flower) (4.6.11)
Requirement already satisfied: billiard<4.0,>=3.6.3.0 in ./celery-venv/lib/python3.6/site-packages (from celery>=3.1.0; python_version < "3.7"->flower) (3.6.3.0)
Requirement already satisfied: importlib-metadata>=0.18; python_version < "3.8" in ./celery-venv/lib/python3.6/site-packages (from kombu<4.7,>=4.6.10->celery>=3.1.0; python_version < "3.7"->flower) (1.7.0)
Requirement already satisfied: amqp<2.7,>=2.6.0 in ./celery-venv/lib/python3.6/site-packages (from kombu<4.7,>=4.6.10->celery>=3.1.0; python_version < "3.7"->flower) (2.6.1)
Requirement already satisfied: zipp>=0.5 in ./celery-venv/lib/python3.6/site-packages (from importlib-metadata>=0.18; python_version < "3.8"->kombu<4.7,>=4.6.10->celery>=3.1.0; python_version < "3.7"->flower) (3.1.0)
Building wheels for collected packages: tornado
Building wheel for tornado (setup.py) ... done
Created wheel for tornado: filename=tornado-6.0.4-cp36-cp36m-linux_armv7l.whl size=415151 sha256=13730a4e10029a5f778cf9c661c6eb841a4fb99a5df9540ddcacbede6ca92413
Stored in directory: /home/bswamina/.cache/pip/wheels/37/a7/db/2d592e44029ef817f3ef63ea991db34191cebaef087a96f505
Successfully built tornado
Installing collected packages: tornado, prometheus-client, humanize, flower
Successfully installed flower-0.9.5 humanize-2.6.0 prometheus-client-0.8.0 tornado-6.0.4

Setup

Now that we have completed the necessary installation, time to setup the nodes in our cluster for Celery.

We will run the Celery Broker (RabbitMQ) on the host my-xu4-3.

In the terminal my-xu4-3, execute the following commands:

$ mkdir rabbitmq

$ docker run -d --hostname celery-mq --name celery-mq -v /home/bswamina/rabbitmq:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=celery_vhost -e RABBITMQ_DEFAULT_USER=celery -e RABBITMQ_DEFAULT_PASS=s3cr3t rabbitmq:3.8.8-management

The following would be the typical output:

Output.19

6542cb6e3ebfabb42fcfe9649065e40dfa0ae2ba144ac489c3143bac93e97ea9

In the terminal my-xu4-3, execute the following command:

$ docker ps -a

The following would be the typical output:

Output.20

CONTAINER ID  IMAGE                      COMMAND                   CREATED          STATUS         PORTS                                    NAMES
6542cb6e3ebf  rabbitmq:3.8.8-management  "docker-entrypoint.s..."  10 seconds ago   Up 6 seconds   4369/tcp, 5671/tcp,
                                                                                                   0.0.0.0:5672->5672/tcp,
                                                                                                   15671/tcp, 15691-15692/tcp, 25672/tcp,
                                                                                                   0.0.0.0:15672->15672/tcp                 celery-mq

Open a web browser for the URL http://my-xu4-3:15672. Enter the user-id celery and password s3cr3t. The following illustration shows the RabbitMQ management screen :

RabbitMQ
Figure.3

In the terminal my-c2-1, execute the following command:

$ docker run -d --hostname celery-rd --name celery-rd -p 6379:6379 redis:6.0.8

The following would be the typical output:

Output.21

337cb95c4bbf3bfb1534ce68911ec9747fdce24440f5c865d6838ecb74088214

In the terminal my-c2-1, execute the following command:

$ docker ps -a

The following would be the typical output:

Output.22

CONTAINER ID   IMAGE          COMMAND                    CREATED         STATUS          PORTS                    NAMES
337cb95c4bbf   redis:6.0.8    "docker-entrypoint.s..."   7 seconds ago   Up 5 seconds    0.0.0.0:6379->6379/tcp   celery-rd

Hands-on Celery

Our Celery Python module is called MyCelery. The following illustration shows the directory contents of the module:

MyCelery
Figure.4

The following is Python script defines the Celery configuration options:

celery_config.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   17 Sep 2020
#

BROKER_URL = 'amqp://celery:s3cr3t@192.168.1.23:5672/celery_vhost'
CELERY_RESULT_BACKEND = 'redis://192.168.1.31:6379/0'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_INCLUDE = ['MyCelery.celery_tasks']
CELERY_ACKS_LATE = True
CELERY_TASK_ROUTES = {
    'MyCelery.celery_tasks.test_task': {'queue': 'test_task.Q', },
    'MyCelery.celery_tasks.get_rate': {'queue': 'get_rate.Q', },
    'MyCelery.celery_tasks.get_exch_rate': {'queue': 'get_rate.Q', },
    'MyCelery.celery_tasks.get_avg_exch_rate': {'queue': 'get_rate.Q', },
}

For our setup, we desire using the JSON format to serialize the task as well as the task execution result. This is what is being specified in CELERY_ACCEPT_CONTENT, CELERY_TASK_SERIALIZER, and CELERY_RESULT_SERIALIZER.

All our Celery tasks are defined in the Python script celery_tasks.py. The module (MyCelery) dot the script name (celery_tasks) is what is specified in CELERY_INCLUDE. In other words, this configuration option is a list all the Celery task modules/scripts.

By default, the Celery Worker will acknowledge immediately (before executing) once it gets a task message. We want to change that behavior to acknowledge after the task execution. This is what is specified in CELERY_ACKS_LATE.

The default Celery queue is called celery and this is where all the task messages end up if not customized. For our demonstration, we have chosen two queues, namely, test_task.Q and get_rate.Q. The task test_task will go to the queue test_task.Q, while the tasks get_rate, get_exch_rate, and get_avg_exch_rate will be sent to the queue get_rate.Q. This is what is specified in CELERY_TASK_ROUTES.

The following is Python script creates an instance of the Celery object and configures it using the options defined in the Python script celery_config:

celery_base.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   17 Sep 2020
#

from celery import Celery
from MyCelery import celery_config

celery_app = Celery('celery_tasks')
celery_app.config_from_object(celery_config)

The following is Python script defines all the Celery tasks that we desire to distribute and execute in a task Worker:

celery_tasks.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   17 Sep 2020
#

import random
import time

from datetime import datetime
from celery.utils.serialization import jsonify

from MyCelery.celery_base import celery_app

crypto_fx = {'BLU': 0.0095,
              'RED': 0.0028}

exchange_diff = {'CAT-BLU': [0.0001, 0.0002, 0.0003, 0.0004, 0.0005],
                  'CAT-RED': [0.0001, 0.0002, 0.0003, 0.0004, 0.0005, 0.0006, 0.0007, 0.0008],
                  'DOG-BLU': [0.0002, 0.0004],
                  'DOG-RED': [0.0002, 0.0004, 0.0006, 0.0008],
                  'FOX-BLU': [0.0001, 0.0003, 0.0005],
                  'FOX-RED': [0.0001, 0.0003, 0.0005, 0.0007]}

random.seed()


@celery_app.task(name='MyCelery.celery_tasks.test_task')
def test_task():
    """
    Task used for testing the setup
        :return: json string with a timestamp field
    """
    time.sleep(0.1)
    return jsonify({"timestamp": datetime.now()})


@celery_app.task(name='MyCelery.celery_tasks.get_rate')
def get_rate(crypto):
    """
    Task used to get the lowest rate of two fictitious crypto coins - BLU and RED from the fictitious market
    providers - CAT, DOG, and FOX
        :param crypto: string with value 'BLU' or 'RED'
        :return: json string with the rate from the provider
    """
    rb = crypto_fx[crypto] - random.choice(exchange_diff['CAT-' + crypto])
    time.sleep(0.05)
    rf = crypto_fx[crypto] - random.choice(exchange_diff['DOG-' + crypto])
    time.sleep(0.05)
    rl = crypto_fx[crypto] - random.choice(exchange_diff['FOX-' + crypto])
    time.sleep(0.05)

    val, idx = min((val, idx) for (idx, val) in enumerate([rb, rf, rl]))

    return jsonify({'crypto': crypto, 'rate': val, 'provider': ['CAT', 'DOG', 'FOX'][idx]})


@celery_app.task(name='MyCelery.celery_tasks.get_exch_rate')
def get_exch_rate(exch, crypto):
    """
    Task to get the current rate for a given fictitious exchange and fictitious crypto coin
        :param exch: string with one of the values 'CAT', 'DOG', or 'FOX'
        :param crypto: string with value 'BLU' or 'RED'
        :return: json string with the rate from the provider
    """
    if datetime.today().second % 2 == 0:
        rate = crypto_fx[crypto] - random.choice(exchange_diff[exch + '-' + crypto])
    else:
        rate = crypto_fx[crypto] + random.choice(exchange_diff[exch + '-' + crypto])
    time.sleep(0.5)

    return jsonify({'provider': exch, 'crypto': crypto, 'rate': rate})


@celery_app.task(name='MyCelery.celery_tasks.get_avg_exch_rate')
def get_avg_exch_rate(exch_rates):
    """
    Task that takes in an json object with array of rates from a provider and computes the average rate
        :param exch_rates: json object containing an array of rates from a provider
        :return: json string with the average rate from the provider
    """
    acc = 0.0
    for elem in exch_rates:
        acc += elem['rate']
    avg = acc / len(exch_rates)
    time.sleep(0.5)

    return jsonify({'provider': exch_rates[0]['provider'], 'crypto': exch_rates[0]['crypto'], 'average': avg})

The following are brief descriptions for some of the keyword(s) and method(s) used in the Python script celery_tasks.py above:

The following is Python script defines a Celery client:

celery_client.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   17 Sep 2020
#

import sys

from MyCelery.celery_tasks import test_task, get_rate


def print_details(name, res):
    """
    Prints the details of the response object of the submitted task
        :param name: name of the approach used to submit the task (delay or apply_async)
        :param res: response object of type AsyncResult
        :return: None
    """
    print(f'{name} res type: {type(res)}')
    print(f'{name} test_task ready ?: {res.ready()}')
    print(f'{name} test_task status: {res.status}')
    print(f'{name} response: {res.get()}')
    print(f'{name} test_task ready ?: {res.ready()}')
    print(f'{name} test_task status: {res.status}')


"""
Main program - if the specified argument is 'test', invoke the test_task. Else, if the specified argument is 'rate'
invoke the get_rate task
"""
if __name__ == '__main__':
    if len(sys.argv) != 2 or sys.argv[1] not in ['test', 'rate']:
        print('Usage: python celery_client.py ')
        sys.exit(1)

    if sys.argv[1] == 'test':
        print('1. test_task via delay ...')

        res1 = test_task.delay()

        print_details('delay', res1)

        print('2. test_task via apply_async ...')

        res2 = test_task.apply_async(queue='test_task.Q')

        print_details('apply_async', res2)
    else:
        for i in range(1, 6):
            print(f'Loop - # {i}')

            res1 = get_rate.apply_async(queue='get_rate.Q', args=('BLU',))
            json1 = res1.get(timeout=5.0)

            print(f'{i} BLU :: {json1}')

            res2 = get_rate.apply_async(queue='get_rate.Q', args=('RED',))
            json2 = res2.get(timeout=5.0)

            print(f'{i} RED :: {json2}')

The following are brief descriptions for some of the keyword(s) and method(s) used in the Python script celery_client.py above:

In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following commands:

(celery-venv) $ mkdir MyCelery

(celery-venv) $ cd MyCelery

Copy the above menthioned Python scripts into the MyCelery on each of the hosts my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5 respectively.

In the terminal my-xu4-4, execute the following command to start a Worker, listening for messages on the two queues named celery and test_task.Q:

$ PYTHONPATH=$HOME celery -A celery_tasks worker -l info -Q celery,test_task.Q -c 2

The following would be the typical output:

Output.23

 -------------- celery@my-xu4-4 v4.4.7 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-5.4.61-odroidxu4-armv7l-with-Ubuntu-20.04-focal 2020-09-18 13:23:36
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_tasks:0xb5ed18f0
- ** ---------- .> transport:   amqp://celery:**@192.168.1.23:5672/celery_vhost
- ** ---------- .> results:     redis://192.168.1.31:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
  -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                .> test_task.Q      exchange=test_task.Q(direct) key=test_task.Q

[tasks]
  . MyCelery.celery_tasks.get_avg_exch_rate
  . MyCelery.celery_tasks.get_exch_rate
  . MyCelery.celery_tasks.get_rate
  . MyCelery.celery_tasks.test_task

[2020-09-18 13:23:36,671: INFO/MainProcess] Connected to amqp://celery:**@192.168.1.23:5672/celery_vhost
[2020-09-18 13:23:36,699: INFO/MainProcess] mingle: searching for neighbors
[2020-09-18 13:23:37,783: INFO/MainProcess] mingle: sync with 1 nodes
[2020-09-18 13:23:37,784: INFO/MainProcess] mingle: sync complete
[2020-09-18 13:23:37,832: INFO/MainProcess] celery@my-xu4-4 ready.

The following are brief descriptions of the command-line options used above:

In the terminal my-xu4-5, execute the following command to start a Worker, listening for messages on the queue named get_rate.Q:

$ PYTHONPATH=$HOME celery -A celery_tasks worker -l info -Q get_rate.Q -c 2

The following would be the typical output:

Output.24

 -------------- celery@my-xu4-5 v4.4.7 (cliffs)
--- ***** ----- 
-- ******* ---- Linux-5.4.61-odroidxu4-armv7l-with-Ubuntu-20.04-focal 2020-09-18 13:23:07
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_tasks:0xb5e8e7d0
- ** ---------- .> transport:   amqp://celery:**@192.168.1.23:5672/celery_vhost
- ** ---------- .> results:     redis://192.168.1.31:6379/0
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
  -------------- [queues]
                .> get_rate.Q       exchange=get_rate.Q(direct) key=get_rate.Q
                

[tasks]
  . MyCelery.celery_tasks.get_avg_exch_rate
  . MyCelery.celery_tasks.get_exch_rate
  . MyCelery.celery_tasks.get_rate
  . MyCelery.celery_tasks.test_task

[2020-09-18 13:23:07,679: INFO/MainProcess] Connected to amqp://celery:**@192.168.1.23:5672/celery_vhost
[2020-09-18 13:23:07,706: INFO/MainProcess] mingle: searching for neighbors
[2020-09-18 13:23:08,793: INFO/MainProcess] mingle: all alone
[2020-09-18 13:23:08,871: INFO/MainProcess] celery@my-xu4-5 ready.

The following illustration shows the RabbitMQ management screen after the screen refresh:

With Workers
Figure.5

In the terminal my-xu4-2, execute the following command to start the Celery monitoring tool called Flower:

$ PYTHONPATH=$HOME celery -A celery_tasks flower -l info -c 2

The following would be the typical output:

Output.25

[I 200918 13:24:21 command:140] Visit me at http://localhost:5555
[I 200918 13:24:21 command:145] Broker: amqp://celery:**@192.168.1.23:5672/celery_vhost
[I 200918 13:24:21 command:148] Registered tasks: 
    ['MyCelery.celery_tasks.get_avg_exch_rate',
      'MyCelery.celery_tasks.get_exch_rate',
      'MyCelery.celery_tasks.get_rate',
      'MyCelery.celery_tasks.test_task',
      'celery.accumulate',
      'celery.backend_cleanup',
      'celery.chain',
      'celery.chord',
      'celery.chord_unlock',
      'celery.chunks',
      'celery.group',
      'celery.map',
      'celery.starmap']
[I 200918 13:24:22 mixins:229] Connected to amqp://celery:**@192.168.1.23:5672/celery_vhost

Open a web browser for the URL http://my-xu4-2:5555. The following illustration shows the Flower monitoring tool screen:

Flower
Figure.6

In the terminal my-xu4-1, execute the following command to start the Celery client to test our setup:

$ PYTHONPATH=$HOME python celery_client.py rate

The following would be the typical output:

Output.26

Loop - # 1
1 BLU :: {'crypto': 'BLU', 'rate': 0.0092, 'provider': 'CAT'}
1 RED :: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'}
Loop - # 2
2 BLU :: {'crypto': 'BLU', 'rate': 0.0091, 'provider': 'CAT'}
2 RED :: {'crypto': 'RED', 'rate': 0.0022, 'provider': 'CAT'}
Loop - # 3
3 BLU :: {'crypto': 'BLU', 'rate': 0.0093, 'provider': 'DOG'}
3 RED :: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'}
Loop - # 4
4 BLU :: {'crypto': 'BLU', 'rate': 0.0091, 'provider': 'CAT'}
4 RED :: {'crypto': 'RED', 'rate': 0.0021, 'provider': 'FOX'}
Loop - # 5
5 BLU :: {'crypto': 'BLU', 'rate': 0.009, 'provider': 'CAT'}
5 RED :: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'}

The following illustration shows the Flower monitoring tool screen after the screen refresh:

After Execution
Figure.7

In the terminal my-xu4-5, which is running a Celery task Worker, the following would be the typical output:

Output.27

[2020-09-17 20:27:13,184: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[ade1c95c-de97-4e53-8a36-b9b85a7f984e]  
[2020-09-17 20:27:13,380: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[ade1c95c-de97-4e53-8a36-b9b85a7f984e] succeeded in 0.18701653799871565s: {'crypto': 'BLU', 'rate': 0.0092, 'provider': 'CAT'}
[2020-09-17 20:27:13,394: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[4c2ba9ee-4de1-40d6-b471-51de7439a8f3]  
[2020-09-17 20:27:13,556: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[4c2ba9ee-4de1-40d6-b471-51de7439a8f3] succeeded in 0.15665612700104248s: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'}
[2020-09-17 20:27:13,562: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[dfd36f77-d59d-4f96-ba06-0072195ed57d]  
[2020-09-17 20:27:13,723: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[dfd36f77-d59d-4f96-ba06-0072195ed57d] succeeded in 0.1563869599995087s: {'crypto': 'BLU', 'rate': 0.0091, 'provider': 'CAT'}
[2020-09-17 20:27:13,730: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[509816a9-4c74-4b39-8e4b-0a6ab4263c19]  
[2020-09-17 20:27:13,891: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[509816a9-4c74-4b39-8e4b-0a6ab4263c19] succeeded in 0.1563130859976809s: {'crypto': 'RED', 'rate': 0.0022, 'provider': 'CAT'}
[2020-09-17 20:27:13,897: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[6fce83fc-88fb-4d4b-86ea-a583a78edabb]  
[2020-09-17 20:27:14,058: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[6fce83fc-88fb-4d4b-86ea-a583a78edabb] succeeded in 0.15644379399964237s: {'crypto': 'BLU', 'rate': 0.0093, 'provider': 'DOG'}
[2020-09-17 20:27:14,064: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[78141f9b-ccf8-4d5a-94dd-05b283edbcd0]  
[2020-09-17 20:27:14,225: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[78141f9b-ccf8-4d5a-94dd-05b283edbcd0] succeeded in 0.15650325199749204s: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'}
[2020-09-17 20:27:14,231: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[d69dac38-de56-4344-961a-7f65a4191946]  
[2020-09-17 20:27:14,392: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[d69dac38-de56-4344-961a-7f65a4191946] succeeded in 0.1563110860006418s: {'crypto': 'BLU', 'rate': 0.0091, 'provider': 'CAT'}
[2020-09-17 20:27:14,398: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[5b452c53-3c38-479b-85db-fc4a81de8bfd]  
[2020-09-17 20:27:14,559: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[5b452c53-3c38-479b-85db-fc4a81de8bfd] succeeded in 0.15641754399985075s: {'crypto': 'RED', 'rate': 0.0021, 'provider': 'FOX'}
[2020-09-17 20:27:14,570: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[2da5c239-3e54-40f3-9e10-27d63c03ea8c]  
[2020-09-17 20:27:14,731: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[2da5c239-3e54-40f3-9e10-27d63c03ea8c] succeeded in 0.15627242000118713s: {'crypto': 'BLU', 'rate': 0.009, 'provider': 'CAT'}
[2020-09-17 20:27:14,739: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[98dd6972-1494-4677-98ce-f4207e4985a3]  
[2020-09-17 20:27:14,900: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[98dd6972-1494-4677-98ce-f4207e4985a3] succeeded in 0.15628821100108325s: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'}

The following illustration shows the executed tasks in the Flower monitoring tool screen after clicking on the navigation bar option Tasks:

Executed Tasks
Figure.8

Moving on to the next example, the following is Python script defines a Celery client with two task pipelines:

celery_client2.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   17 Sep 2020
#

from celery import chain, group

from MyCelery.celery_tasks import get_exch_rate, get_avg_exch_rate

"""
Main program that demonstrates how one could create complex pipelines using the group and chain tasks
"""
if __name__ == '__main__':
    group_inst = group(get_exch_rate.s('FOX', 'RED'),
                        get_exch_rate.s('FOX', 'RED'),
                        get_exch_rate.s('FOX', 'RED'))

    print(f'group_inst = {group_inst}')
    res = group_inst.apply_async(queue='get_rate.Q')
    print(f'group res type: {type(res)}')
    print(f'group ready ?: {res.ready()}')
    print(f'response: {res.get()}')
    print(f'group ready ?: {res.ready()}')

    chain_inst = chain(group(get_exch_rate.s('DOG', 'BLU'),
                              get_exch_rate.s('DOG', 'BLU'),
                              get_exch_rate.s('DOG', 'BLU')),
                        get_avg_exch_rate.s())

    print(f'chain_inst = {chain_inst}')
    res2 = chain_inst.apply_async(queue='get_rate.Q')
    print(f'chain res type: {type(res2)}')
    print(f'chain ready ?: {res2.ready()}')
    print(f'response: {res2.get()}')
    print(f'chain ready ?: {res2.ready()}')

The following are brief descriptions for some of the keyword(s) and method(s) used in the Python script celery_client2.py above:

The Celery group and/or chain primitives support execution of complex sequence of task processing pipelines.

In the terminal my-xu4-1, execute the following command to start the Celery client to test our complex task pipelines:

$ PYTHONPATH=$HOME python celery_client2.py

The following would be the typical output:

Output.28

group_inst = group((MyCelery.celery_tasks.get_exch_rate('FOX', 'RED'), get_exch_rate('FOX', 'RED'), get_exch_rate('FOX', 'RED')))
group res type: <class 'celery.result.GroupResult'>
group ready ?: False
response: [{'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0023}, {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0025}, {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0025}]
group ready ?: True
chain_inst = %MyCelery.celery_tasks.get_avg_exch_rate((get_exch_rate('DOG', 'BLU'), get_exch_rate('DOG', 'BLU'), get_exch_rate('DOG', 'BLU')))
chain res type: <class 'celery.result.AsyncResult'>
chain ready ?: False
response: {'provider': 'DOG', 'crypto': 'BLU', 'average': 0.009766666666666667}
chain ready ?: True

In the terminal my-xu4-5, which is running a Celery task Worker, the following would be the typical output:

Output.29

[2020-09-18 13:24:48,082: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[cf21d167-ab11-44ad-84e5-2fb75d044fb6]  
[2020-09-18 13:24:48,088: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[745d22bf-4991-40f4-85aa-9d18f3b8ce7f]  
[2020-09-18 13:24:48,093: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[b3c81594-5f20-44b3-ae77-9111329df7eb]  
[2020-09-18 13:24:48,634: INFO/ForkPoolWorker-1] Task MyCelery.celery_tasks.get_exch_rate[745d22bf-4991-40f4-85aa-9d18f3b8ce7f] succeeded in 0.5376590699997905s: {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0025}
[2020-09-18 13:24:48,634: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_exch_rate[cf21d167-ab11-44ad-84e5-2fb75d044fb6] succeeded in 0.534990074000234s: {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0023}
[2020-09-18 13:24:49,149: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_exch_rate[b3c81594-5f20-44b3-ae77-9111329df7eb] succeeded in 0.5070336179996957s: {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0025}
[2020-09-18 13:24:49,159: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[af761c51-3e76-4ddf-8997-c1eefc161f56]  
[2020-09-18 13:24:49,163: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[b8d5d4d2-7fa2-41d4-95e4-5bafca9a0840]  
[2020-09-18 13:24:49,168: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[8d853764-a866-46cc-a10f-19d555940d9b]  
[2020-09-18 13:24:49,685: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_exch_rate[af761c51-3e76-4ddf-8997-c1eefc161f56] succeeded in 0.5117993189996923s: {'provider': 'DOG', 'crypto': 'BLU', 'rate': 0.0097}
[2020-09-18 13:24:49,685: INFO/ForkPoolWorker-1] Task MyCelery.celery_tasks.get_exch_rate[b8d5d4d2-7fa2-41d4-95e4-5bafca9a0840] succeeded in 0.5109045700010029s: {'provider': 'DOG', 'crypto': 'BLU', 'rate': 0.009899999999999999}
[2020-09-18 13:24:50,316: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_exch_rate[8d853764-a866-46cc-a10f-19d555940d9b] succeeded in 0.6247811409994029s: {'provider': 'DOG', 'crypto': 'BLU', 'rate': 0.0097}
[2020-09-18 13:24:50,318: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_avg_exch_rate[fd3b5850-7ac6-43d0-872f-99a2ca2cd90d]  
[2020-09-18 13:24:50,833: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_avg_exch_rate[fd3b5850-7ac6-43d0-872f-99a2ca2cd90d] succeeded in 0.5043644559991662s: {'provider': 'DOG', 'crypto': 'BLU', 'average': 0.009766666666666667}

This completes our demonstration of the Celery distributed task queue processing system using Python.

!!! DISCLAIMER !!!

The crypto coins and exchanges mentioned above in this article are purely *FICTITIOUS* and are for illustrative purposes only.

References

Celery - Distributed Task Queue

Celery - Full Stack Python

GitHub - MyCelery Source Code


© PolarSPARC