背景

我们公司的产品有一些微应用依然使用的 Python 开发,由于历史原因,公司产品提供的 Python 环境是 2.7.18,所以当时选择的框架是支持 Python2 的异步框架 Tornado。

最近我有个需求是需要启动一个微服务提供几个接口,于是我本着不引入新依赖的原则打算基于平台提供的 Tornado 封装的模块去开发一个微服务。于是借此机会研究了一下这个古老的框架,这篇文章主要是记录一下在 Python2 环境下使用 Tornado 进行异步接口开发的方案。

注:本文的代码只是调试 Demo,并非实际开发代码,不要太纠结于代码质量。

Tornado 中进行异步请求

首先说一下本文探讨的话题的两个关键点:

  • 第一点:Python 环境是基于 Python2.7 下面,这个很重要,因为 Python2 实际上是没有异步函数的,所以在框架中实现异步并不是一件简单的事情。
  • 第二点:我主要探讨的是在 Tornado 的接口的逻辑处理过程中去调用其他接口的异步行为。

使用线程池 ThreadPoolExecutor

这个方案是我咨询 ChatGPT 获得的方案,我的需求是希望能通过 requests 库来进行接口调用,然后需要实现异步,得到的方案是将请求放到 ThreadPoolExecutor 中,以下是实现的封装:

# -*- coding: utf-8 -*-
import logging
import tornado.web
import tornado.gen
import concurrent.futures
import requests


class BaseSDK(object):
    def __init__(self, host, org, user='defaultUser', max_workers=8):
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers)
        self.api_host = "http://{host}".format(host=host)
        self.headers = {
            'org': str(org),
            'user': user,
            'Content-Type': 'application/json'
        }
        self.logger = logging.getLogger(__name__)

    @tornado.gen.coroutine
    def fetch_async(self, method, api, **kwargs):
        # 合并默认和自定义头部
        headers = self.headers.copy()
        headers.update(kwargs.get('headers', {}))
        kwargs['headers'] = headers
        kwargs['timeout'] = kwargs.get('timeout', 10)

        # 拼接完整 URL
        url = self.api_host + api

        try:
            # 执行请求
            response = yield self.executor.submit(requests.request, method=method, url=url, **kwargs)
            response.raise_for_status()  # 检查 HTTP 状态码
            data = response.json()
        except requests.exceptions.RequestException as e:
            # 请求错误
            self.logger.error("Request failed: {url}, error: {error}".format(url=url, error=str(e)))
            data = {}
        except ValueError as e:
            # JSON 解析错误
            self.logger.error(
                "Invalid JSON response: {url}, error: {error}".format(url=url, error=str(e)))
            data = {}
        else:
            self.logger.info("Request succeeded: {url}".format(url=url))

        raise tornado.gen.Return(data)

    def close(self):
        """
        关闭线程池,释放资源
        """
        self.executor.shutdown(wait=True)

这个方案在我看来是比较灵活的,毕竟从代码使用上看直接对 requests 的使用进行了一下处理而已,就是创建一个线程池将请求放进去。

使用内置的 AsyncHTTPClient

另一个方案是我查看 tornado 的文档看到的方案,也是我们公司的框架中使用的方案,就是直接使用 tornado 内置的异步请求类 AsyncHTTPClient 实现异步,下面这个是实现的一个封装类,这个类跟上面那种方案实现的功能是一模一样的,可以直接替换:

# -*- coding: utf-8 -*-
import json
import logging
import tornado.gen
from tornado.httpclient import AsyncHTTPClient, HTTPRequest, HTTPError

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


class BaseSDK(object):
    def __init__(self, host, org, user='defaultUser'):
        self.api_host = "http://{}".format(host)
        self.headers = {
            'org': str(org),
            'user': user,
            'Content-Type': 'application/json'
        }

    @tornado.gen.coroutine
    def fetch_async(self, method, uri, **kwargs):
        """
        兼容 requests 参数风格的异步请求方法
        :param method: 请求方法 ('GET', 'POST', 等)
        :param uri: 接口地址
        :param kwargs: 请求参数,兼容 requests 参数格式
            - params: 查询参数 (dict),会自动附加到 URL 上
            - data: 表单数据 (dict 或字符串),用于 POST/PUT 等方法
            - json: JSON 数据 (dict),自动序列化为字符串
            - headers: 请求头 (dict)
            - timeout: 超时时间 (秒)
        :return: 请求结果(解析后的 JSON 格式字典);如果状态码不是 200,返回空字典 {}
        """
        # 处理参数
        params = kwargs.get('params', None)  # 查询参数
        data = kwargs.get('data', None)  # 表单数据
        json_payload = kwargs.get('json', None)  # JSON 数据
        headers = kwargs.get('headers', self.headers)  # 请求头
        timeout = kwargs.get('timeout', 10)  # 超时时间

        # 构建完整的 URL
        url = self.api_host + uri
        if params:
            query_string = "&".join(["{}={}".format(k, v) for k, v in params.items()])
            url = "{}?{}".format(url, query_string)

        # 构建请求体
        body = None
        if json_payload is not None:
            body = json.dumps(json_payload)  # JSON 请求体
            headers['Content-Type'] = 'application/json'
        elif data is not None:
            if isinstance(data, dict):
                body = "&".join(["{}={}".format(k, v) for k, v in data.items()])
                headers['Content-Type'] = 'application/x-www-form-urlencoded'
            else:
                body = data  # 如果 data 是字符串,直接使用

        # 构建 HTTPRequest 对象
        request = HTTPRequest(
            url=url,
            method=method.upper(),
            headers=headers,
            body=body,
            request_timeout=timeout
        )

        # 使用 AsyncHTTPClient 发起异步请求
        http_client = AsyncHTTPClient()
        try:
            # 发送请求
            response = yield http_client.fetch(request)
            # 响应成功(状态码为 200)
            try:
                result = json.loads(response.body)
            except ValueError:
                result = response.body
        except HTTPError as http_error:
            # 区分非 200 响应和网络异常
            if http_error.response:
                logger.error(
                    "HTTPError: Non-200 status code: %s, url: %s",
                    http_error.response.code, url
                )
                result = {}
            else:
                logger.error("HTTPError: Network error: %s, url: %s", http_error, url)
                result = {}
        except Exception as e:
            logger.error("Request failed: %s, url: %s", e, url)
            result = {}
        raise tornado.gen.Return(result)

    def close(self):
        pass

这个方案其实很简单,直接使用 AsyncHTTPClientHTTPRequest 就可以进行异步请求,上面很多代码是在对请求参数进行处理,目的是为了兼容 requests 的请求参数。

开发异步接口

上面的两个类实现了同样的封装,可以作为基类,下面是一个实现具体请求的子类:

# -*- coding: utf-8 -*-
import tornado.gen
from base_sdk import BaseSDK


class CMDBSDK(BaseSDK):
    def __init__(self, host, org, user='defaultUser'):
        super(CMDBSDK, self).__init__(host, org, user)


    @tornado.gen.coroutine
    def instance_api_import_instance(self, object_id, payload):
        api = '/object/{object_id}/instance/_import'.format(object_id=object_id)
        data = yield self.fetch_async('POST', api, json=payload)
        raise tornado.gen.Return(data)

这个类里面可以封装具体的请求,用来异步请求其他平台的接口。

然后下面是一个开发的接口类:

# -*- coding: utf-8 -*-
import json

import tornado.web
import tornado.gen
from sdk.cmdb import CMDBSDK


class CMDBImportHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def post(self, pk):
        cmdb_sdk = CMDBSDK('100.88.88.201:8079', '777777')

        try:
            object_id = pk
            payload = json.loads(self.request.body)

            # 确保请求数据有效
            if not object_id or not payload:
                self.set_status(400)  # 错误请求
                self.write({"message": "Invalid input data"})
                return

            # 调用 CMDB SDK 的导入接口
            data = yield cmdb_sdk.instance_api_import_instance(object_id, payload)

            # 返回成功响应
            self.set_status(200)
            self.write({
                "message": "POST request successful",
                "data": data
            })
        except Exception as e:
            # 错误处理,返回错误响应
            self.set_status(500)  # 错误请求
            self.write({
                "message": "POST request failed",
                "error": str(e)
            })
        finally:
            cmdb_sdk.close()

启动服务

下面是一个启动服务的文件,简单启动微服务:

# -*- coding: utf-8 -*-
import tornado.ioloop
import tornado.web
import tornado.httpserver
from handlers.handlers import CMDBSearchHandler,CMDBImportHandler


def make_app():
    """
    创建 Tornado 应用程序实例并注册路由
    """
    return tornado.web.Application([
        (r"/cmdb/search/(?P<pk>\w+)", CMDBSearchHandler),
        (r"/cmdb/import/(?P<pk>\w+)", CMDBImportHandler),
    ])


def start_server(port, num_processes):
    """
    启动 Tornado 服务并分配多线程
    :param port: 监听的端口
    :param num_processes: 启动的线程数(一般设置为 CPU 核心数)
    """
    app = make_app()
    server = tornado.httpserver.HTTPServer(app)

    # 设置监听端口
    server.bind(port)

    # 启动多线程服务
    server.start(num_processes)  # num_processes 为线程数,0 表示使用 CPU 核心数
    print("Server started on port {} with {} processes.".format(port, num_processes))

    # 启动 Tornado I/O 循环
    tornado.ioloop.IOLoop.current().start()


if __name__ == "__main__":
    try:
        # 启动服务,指定线程数
        start_server(8888, 4)
    except KeyboardInterrupt:
        print("\nServer stopped by user.")
    except Exception as e:
        print("Error starting server: {}".format(e))

总结

Tornado 在 Python2 中实现异步操作,需要将异步的操作放到一个函数中,并且使用装饰器 @tornado.gen.coroutine 进行封装,然后在使用的时候使用 yield 获取调用结果。而在接口返回中,不能使用 return 返回结果,应该使用 raise tornado.gen.Return(data) 这种方式。

参考文档: