Home Python non-blocking, parallel request handling with Flask/Celery/Gunicorn
Post
Cancel

Python non-blocking, parallel request handling with Flask/Celery/Gunicorn

If you’re looking to deploy a production-ready web application in Python, Flask is a great choice for its simplicity and flexibility and when it comes to deploying Flask apps, Gunicorn is a widely used, reliable server that is designed to handle high traffic and is perfect for production environments. In this post, I will try to cover how to deploy a Flask app using Gunicorn and how to take advantage of its non-blocking, parallel request handling features plus integrating Celery with Gunicorn to achieve even greater scalability and parallelism in your Flask app.

Scenario

I have a raspberry pi that I use for a lot of applications and logging the internet bandwidth/performace of the ISP is one of them.

Until now, it was a python script to do all the tasks - Flask development server that was not not capable of handling multiple requests in parallel. So if I issued a command to do a speedtest (using a telegram message) it would start the test but any futher requests, e.g. to fetch the realtime weather in a province in Canada would be queued and I would have to wait till the speedtest to complete to get the results for the message to check the weather.

This was not a great problem rather an inconvenience and I saw room for improvement to this teething problem. One of the solutions was to build a web app using Dart1 but as I was convenient with it already, I wanted to learn how to do it with python; came up with a solution using flask, celery and gunicorn, here we go!

Details

When it comes to deploying Flask apps, Gunicorn is a widely used, reliable server that is designed to handle high traffic and is perfect for production environments. In this post, I’ll cover how to deploy a Flask app using Gunicorn and celery and how to take advantage of its non-blocking, parallel request handling features.

Gunicorn (short for “Green Unicorn”) is a Python WSGI (Web Server Gateway Interface) HTTP server that can be used to deploy Python web applications, including Flask apps. It’s designed to handle a large number of requests and is built to be reliable and scalable. Gunicorn is widely used in production environments, and many web hosting providers support Gunicorn out of the box.

Celery is a distributed task queue framework that allows you to offload time-consuming tasks to background workers. It is designed to be simple and easy to use, yet powerful and flexible enough to handle a wide variety of use cases.

With Celery, you can define tasks as Python functions and then execute those tasks asynchronously in the background using one or more worker processes. The tasks can be scheduled to run at a specific time, or triggered by an event or condition. Celery also supports task retries, task prioritization, and task chaining, allowing you to build complex workflows and pipelines.

Celery uses a message broker to communicate between the task producer and the worker processes. The message broker stores the tasks and distributes them to available workers. I use redis here as I use it for other tasks on the raspberry pi.

To use Celery in your Python application, you need to define a Celery instance and configure it with your message broker and any other settings. Then, you can define your tasks as Python functions and decorate them with the @celery.task decorator. Finally, you can call the tasks using the apply_async method or one of the other task invocation methods provided by Celery. Celery is a powerful and flexible tool for offloading time-consuming tasks to background workers, freeing up your application to handle other requests and providing improved performance and scalability.

Installation

1
$ pip3 install flask gunicorn celery

Code

As of now, I have the three main functions for which I use this webserver:

  • Reboot the modem when I send a command to my telegram bot
  • Fetch the ARP table
  • Speedtest to log the internet bandwidth/performance.

Nginx reverse proxy config is not covered in this article.

Straight to the code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#install celery, flask and gunicorn beforehand

from flask import Flask, jsonify, request, abort
from celery import Celery
import requests
import json
import subprocess
from bs4 import BeautifulSoup
import re
import os
from datetime import datetime


from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

AUTHORIZED_SENDERS = [mychatid]

# google sheets stuff to log the internet bandwidth/performance data
TOKEN_FILE = '/token.json'
SCOPES = ["https://www.googleapis.com/auth/spreadsheets", "https://www.googleapis.com/auth/drive.file"]
spreadsheet_id = 'id of the google spreadsheet'

# Flask app
app = Flask(__name__)

# Celery app
celery = Celery(app.name, broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

def send_telegram_message (message_text, chat_id):
    # Set up the API endpoint and bot token
    api_endpoint = 'https://api.telegram.org/bot<my_bot_token>/sendMessage'
    data = {'chat_id': chat_id, 'text': message_text, 'parse_mode': 'MarkdownV2', 'disable_web_page_preview': True}
    response = requests.post(api_endpoint, data=data)
    if response.status_code != 200:
        raise Exception(f'Failed to send Telegram message: {response.content}')

def stest(chat_id):
    output = subprocess.getoutput("speedtest --selection-details -u MB/s -f json")
    output = output.splitlines()[-1]
    output = json.loads(output)
    try:
        if 'download' in output:
            download_speed = re.escape(str(round(output["download"]["bandwidth"]/1000000, 1)))
            upload_speed = re.escape(str(round(output["upload"]["bandwidth"]/1000000, 1)))
            jitter = re.escape(str(round(output["ping"]["jitter"], 1)))
            latency = re.escape(str(round(output["ping"]["latency"], 1)))
            server_name = re.escape(str(output["server"]["name"]))
            server_location = re.escape(str(output["server"]["location"]))
            server_ip = re.escape(str(output["server"]["ip"]))
            url = re.escape(str(output["result"]["url"]))
            message_for_telegram = f'Download: *{download_speed} MBps*\nUpload: *{upload_speed} MBps*\n\nJitter: *{jitter}*\nLatency: *{latency}*\n\nServer: *{server_name}*\nLocation: *{server_location}*\nIP: *{server_ip}*\n\n[View on Web]({url})'
        else:
            return 'something wrong with the test results. sorry but the only option is to try again later as I am lazy to troubleshoot and come up with fallback mechanisms to cater the exceptions occured due to connectivity etc'

    except Exception as e:
        (str(e))

    # okay before sending to telegram let me write to the spreadsheet
    try:
        creds = None
        if os.path.exists(TOKEN_FILE):
            creds = Credentials.from_authorized_user_file(
                TOKEN_FILE, SCOPES)
        if not creds or not creds.valid:
            if creds and creds.expired and creds.refresh_token:
                creds.refresh(Request())
            else:
                print ("appurathe serverinnu eduth thinneda. ennnit ni tokenum kaanichit poya mathi... alla raamankutti, njaanivanoddddddddddddd.... ")
            with open('token.json', 'w') as token:
                    token.write(creds.to_json())
        service = build('sheets', 'v4', credentials=creds)
        now_time = datetime.now().strftime("%d/%m/%Y %H:%M")
        row = [
            [ now_time , round(output["download"]["bandwidth"]/1000000, 1) , round(output["upload"]["bandwidth"]/1000000, 1) , round(output["ping"]["jitter"], 1), round(output["ping"]["latency"], 1) ,output["server"]["name"] , output["server"]["location"] , output["server"]["ip"] , output["result"]["url"] ],
        ]
        request = service.spreadsheets().values().append(spreadsheetId=spreadsheet_id,range="stest2!A:Z",body={"majorDimension": "ROWS","values": row},valueInputOption="USER_ENTERED"
        )
        response = request.execute()

    except Exception as e:
        (str(e))
    send_telegram_message(message_for_telegram,chat_id)

def reboot_modem():
    url = 'http://172.16.0.1/boaform/admin/formReboot'
    payload = 'save=Commit+and+Reboot'
    headers = {'Authorization': 'Basic <base64encodedcreds>', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'}
    try:
        requests.post(url, headers=headers, data=payload)
    except Exception as e:
        (str(e))

def detect_pub_ip(chat_id):
    url = f'http://172.16.0.1/boaform/admin/formStatus'
    payload = 'submitppp0=Connect&submit-url=%2Fadmin%2Fstatus.asp'
    headers = {'Authorization': 'Basic <base64encodedcreds>', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'}
    response = requests.post(url, headers=headers, data=payload)
    if response.status_code == 200:
        response = requests.get('http://172.16.0.1/admin/status.asp', headers=headers)
        data_to_process = response.text
        soup = BeautifulSoup(data_to_process, 'html.parser')
        for tr in soup.find_all('tr')[2:]:
            tds = tr.find_all('td')
        if tds[4].text:
            send_telegram_message(re.escape(str(tds[4].text)), chat_id)
        else:
            send_telegram_message("Trouble finding the ip now, check later", chat_id)

def arp_table(chat_id):
    mac_table = ''
    headers = {'Authorization': 'Basic <base64encodedcreds>', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36'}
    response = requests.request("GET",'http://172.16.0.1/arptable.asp', headers=headers)
    if response.status_code == 200:
        data_to_process = response.text
        soup = BeautifulSoup(data_to_process, 'html.parser')
        for td in soup.find_all('td'):
            mac_table = mac_table + str(td.text) + '\n'
        message_for_tele = mac_table
        if len(message_for_tele) >= 4000:
            x=4000
            message_array=[message_for_tele[y-x:y] for y in range(x, len(message_for_tele)+x,x)]
            for message in message_array:
                send_telegram_message(re.escape(str(message)), chat_id)
        else:
            send_telegram_message(re.escape(str(message_for_tele)), chat_id)

# Define a Celery task to handle incoming messages
@celery.task(bind=True, default_retry_delay=10, max_retries=3)
def handle_message(self,data):
    if 'message' in data:
        message_id = data['message']['message_id']
        chat_id = data['message']['chat']['id']
        text = data['message'].get('text', None)
    else:
        app.logger.info("No message found in data.")

    # Check if the sender is authorized
    if chat_id not in AUTHORIZED_SENDERS:
        return 'Unauthorized sender'

    if text == '/start':
        stest(chat_id)
    elif text == '/ip':
        detect_pub_ip(chat_id)
    elif text == '/reboot':
        reboot_modem()
    elif text == '/arp':
        arp_table(chat_id)
    else:
        send_telegram_message('Sorry, I don\'t understand that command.', chat_id)

# Define a route to handle incoming webhooks
@app.route('/telegram-incoming-webhook', methods=['POST'])
def handle_incoming():
    try:
        data = request.get_json()
        if data is None:
            return jsonify({'error': 'No JSON data received'}), 400
        else:
            task = handle_message.apply_async(args=(data,))
            app.logger.info(f"task id is {task.id}")
            # while not task.ready():
            #     continue
            # # Return the result as JSON
            # return jsonify({'result': task.get()})
            return 'OK'
        
    except Exception as e:
        app.logger.error(f'Error processing incoming message: {e}')
        return 'Error processing incoming message', 500


@app.before_request
def limit_remote_addr():
    if request.path != '/telegram-incoming-webhook':
        abort(403)

if __name__ == '__main__':
    app.logger.info("forking celery...")
    celery_command = f'celery -A stestWebhook.celery worker -l info -c 4'
    subprocess.Popen(celery_command.split())

    app.logger.info('Gunicorning...')
    gunicorn_command = f'gunicorn stestWebhook:app -b 0.0.0.0:5000 --workers 4'
    subprocess.Popen(gunicorn_command.split())

Demo

Bonus

Linux service:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[Unit]
Description=Telegram incoming Webhook
After=network.target

[Service]
Type=forking
User=pi
Group=pi
WorkingDirectory=/home/pi/stest/
ExecStart=/usr/bin/python3 /home/pi/stest/stestWebhook.py

[Install]
WantedBy=multi-user.target

This post is licensed under CC BY 4.0 by the author.