Python + Celery: Creating asynchronous tasks

Hey, everyone! Today is part II of our "Asynchronous tasks with Python + Flask + Celery + Docker" series. We are going to actually create a few tasks. If you still haven't read it, the part one is here. This is the basis for the next steps.

First, we are going to create the most basic task. We are going to implement the process of doing an asynchronous call to a service. This is common, for example, when we have a function that has to send an email.

1.Creating our task function

We are importing the celery instance we defined in our worker class and we are decorating a function with "@celery.task". That's all we need to do to define this function as a task.

app.src.business.hard_work.py

from ..worker import celery
from celery.utils.log import get_task_logger


logger = get_task_logger(__name__)


@celery.task(bind=True)
def send_email(task):
    logger.info('Send email')
    #implement process to send email

2.Adding this class to our celery configuration

We need to tell celery about the module containing the task so it knows where it can find the tasks. We do this by including the parameter include=(module_name) to the Celery initialization.

from celery import Celery
import os

#get the message broker url
redis_url = os.environ.get("REDIS_URL", 'redis://localhost:6379')

#adding the parameter include with our module
celery = Celery(__name__, backend="%s/0" % redis_url, broker="%s/1" % redis_url,
                include=('app.src.business.hard_work',))


def create_app(app):
    celery.conf.update(app.config)

3.Creating a service to call our task

Now we are creating a service to call our task.

from flask import Flask, request
from .worker import create_app
from .business.hard_work import send_email

app = Flask(__name__)

create_app(app)

@app.route('/send_email', methods=['POST'])
def email_notification():

    send_email.delay()

    return {'message': 'Email sent.'}

4. Let's test!

To check if it worked, first we have to restart our celery worker:

 celery -A app.src.worker worker -l info

You should see in celery log under [task] the name of our created task:

[tasks]
  . app.src.business.hard_work.send_email

Let's call our service:

 curl -X POST http://localhost:5000/send_email
>> {"message":"Email sent."}

We should see in the celery log the message we logged with some more infos:

[2021-04-25 10:04:45,463: INFO/MainProcess] Received task: app.src.business.hard_work.send_email[cb1bb665-63b2-4416-ad9f-5300aa5e415c]
[2021-04-25 10:04:45,465: INFO/ForkPoolWorker-8] app.src.business.hard_work.send_email[cb1bb665-63b2-4416-ad9f-5300aa5e415c]: Send email
[2021-04-25 10:04:45,473: INFO/ForkPoolWorker-8] Task app.src.business.hard_work.send_email[cb1bb665-63b2-4416-ad9f-5300aa5e415c] succeeded in 0.008024199996725656s: None

Other possibilities

Celery gives a lot of other possibilities. Another example is to divide our work in different tasks then join the result

1.Grouping tasks results

Let's say you want to divide your work in different tasks and you need to join the results. In this case we can use the group function. In this example, we are receiving a text, splitting it and each part of the text is going to be processed independently.

from ..worker import celery
from celery.utils.log import get_task_logger
from celery import group


logger = get_task_logger(__name__)


def strings_to_upper(text):

    #group is giving us a list of the tasks results
    upper_text = group(strings_to_upper_task.s(line)
                    for line in text.split('.'))().get()

    return '.'.join(upper_text)


@celery.task(bind=True)
def strings_to_upper_task(task, text):
    logger.info('To upper: {0}'.format(text))
    return text.upper()

2.Our endpoint to call it:

from flask import Flask, request
from .worker import create_app
from .business.hard_work import strings_to_upper

app = Flask(__name__)

create_app(app)

@app.route('/upperfy', methods=['POST'])
def notification():
    text = request.json['text']
    upper_text = strings_to_upper(text)

    return {'upper_text': upper_text}

3.Let's test it!

We should see our new task listed in celery log when we restart it:

[tasks]
  . app.src.business.hard_work.send_email
  . app.src.business.hard_work.strings_to_upper_task

When we call the service we should see the joined result:

curl -X POST http://localhost:5000/upperfy --data '{"text":"One.Two.Three.Four."}'  --header "Content-Type: application/json"
>>{"upper_text":"ONE.TWO.THREE.FOUR."}

And in the celery log we should see the tasks logs:

[2021-04-25 10:27:43,982: INFO/ForkPoolWorker-8] app.src.business.hard_work.strings_to_upper_task[1823436a-42fa-4d0d-ab0a-564803ba834c]: To upper: One
[2021-04-25 10:27:43,984: INFO/MainProcess] Received task: app.src.business.hard_work.strings_to_upper_task[700c9e4a-7b87-4773-b700-5833ea5859ab]
[2021-04-25 10:27:43,985: INFO/MainProcess] Received task: app.src.business.hard_work.strings_to_upper_task[3843f878-cff2-4662-9c31-44ae71ba2cfc]
[2021-04-25 10:27:43,987: INFO/ForkPoolWorker-2] app.src.business.hard_work.strings_to_upper_task[3843f878-cff2-4662-9c31-44ae71ba2cfc]: To upper: Three
[2021-04-25 10:27:43,986: INFO/ForkPoolWorker-1] app.src.business.hard_work.strings_to_upper_task[700c9e4a-7b87-4773-b700-5833ea5859ab]: To upper: Two
[2021-04-25 10:27:43,988: INFO/ForkPoolWorker-8] Task app.src.business.hard_work.strings_to_upper_task[1823436a-42fa-4d0d-ab0a-564803ba834c] succeeded in 0.006808200007071719s: 'ONE'
[2021-04-25 10:27:43,989: INFO/MainProcess] Received task: app.src.business.hard_work.strings_to_upper_task[444a68a6-cd0d-4b79-8c32-ad9b1160f237]
[2021-04-25 10:27:43,992: INFO/ForkPoolWorker-2] Task app.src.business.hard_work.strings_to_upper_task[3843f878-cff2-4662-9c31-44ae71ba2cfc] succeeded in 0.005920700001297519s: 'THREE'
[2021-04-25 10:27:43,993: INFO/MainProcess] Received task: app.src.business.hard_work.strings_to_upper_task[a8e66b21-03e3-4cd0-b332-548ee1210b67]
[2021-04-25 10:27:43,993: INFO/ForkPoolWorker-1] Task app.src.business.hard_work.strings_to_upper_task[700c9e4a-7b87-4773-b700-5833ea5859ab] succeeded in 0.006537200009915978s: 'TWO'
[2021-04-25 10:27:43,994: INFO/ForkPoolWorker-8] app.src.business.hard_work.strings_to_upper_task[444a68a6-cd0d-4b79-8c32-ad9b1160f237]: To upper: Four
[2021-04-25 10:27:43,995: INFO/ForkPoolWorker-3] app.src.business.hard_work.strings_to_upper_task[a8e66b21-03e3-4cd0-b332-548ee1210b67]: To upper:
[2021-04-25 10:27:43,995: INFO/ForkPoolWorker-8] Task app.src.business.hard_work.strings_to_upper_task[444a68a6-cd0d-4b79-8c32-ad9b1160f237] succeeded in 0.001226200009114109s: 'FOUR'
[2021-04-25 10:27:44,001: INFO/ForkPoolWorker-3] Task app.src.business.hard_work.strings_to_upper_task[a8e66b21-03e3-4cd0-b332-548ee1210b67] succeeded in 0.00622500000463333s: ''

I hope this was useful! The resulting code is here and we still have a couple more posts to come!