python3之压测工具

因为项目新上线,需要压测一下接口

软件压力测试是一种基本的质量保证行为,它是每个重要软件测试工作的一部分。软件压力测试的基本思路很简单:不是在常规条件下运行手动或自动测试,而是在计算机数量较少或系统资源匮乏的条件下运行测试。通常要进行软件压力测试的资源包括内部内存、CPU 可用性、磁盘空间和网络带宽。 压力测试是给软件不断加压,强制其在极限的情况下运行,观察它可以运行到何种程度,从而发现性能缺陷,是通过搭建与实际环境相似的测试环境,通过测试程序在同一时间内或某一段时间内,向系统发送预期数量的交易请求、测试系统在不同压力情况下的效率状况,以及系统可以承受的压力情况。然后做针对性的测试与分析,找到影响系统性能的瓶颈,评估系统在实际使用环境下的效率情况,评价系统性能以及判断是否需要对应用系统进行优化处理或结构调整。并对系统资源进行优化。

然后因为我们项目的接口无论请求还是下发都是加密的,常规的压测工具不太合适,于是。嗯,那就用python使用多线程(ps:主要为了模拟疯狂点击导致并发会不会发生车祸)写一个吧

import requests
import threading
import json
import time
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import unpad
import base64

# 声明加减密的秘钥值
key = 'xxxxxx'

# 请求响应时间数组
timeArr = []


def AES_ECB_Encode(aes_str):
    # 使用key,选择加密方式
    aes = AES.new(key.encode('utf-8'), AES.MODE_ECB)
    pad_pkcs7 = pad(aes_str.encode('utf-8'), AES.block_size, style='pkcs7')  # 选择pkcs7补全
    encrypt_aes = aes.encrypt(pad_pkcs7)
    # 加密结果
    encrypted_text = str(base64.encodebytes(encrypt_aes), encoding='utf-8')  # 解码
    encrypted_text_str = encrypted_text.replace("\n", "")

    return encrypted_text_str


def AES_ECB_Decode(aes_str):
    aes = AES.new(key.encode('utf-8'), AES.MODE_ECB)
    decryption_aes = base64.decodebytes(aes_str.encode('utf-8'))  # 解码
    decryption_text = aes.decrypt(decryption_aes)
    pkcs7Data = (unpad(decryption_text, AES.block_size, style='pkcs7')).decode('utf-8')
    res = json.loads(pkcs7Data)

    return res


# 獲取數據
def get_url(mainUrl, postData, requestHeaders):
    """
    :param mainUrl: 请求的域名地址
    :param postData: 请求的post 内容
    :param requestHeaders: 请求头
    """
    '''先变成json字符串再去除字符串中的空格'''
    jsonData = ''.join((json.dumps(postData)).split())

    res = AES_ECB_Encode(jsonData)

    response = requests.post(url=mainUrl, headers=requestHeaders, data=res)

    return response


# 单线程执行
def actionsUrl(requestNumCount, mainUrl, postData, requestHeaders):
    """
    :param requestNumCount: 总共执行的线程数(总的请求数=总共执行的线程数*没个线程循环请求的数量)
    :param mainUrl: 请求的域名地址
    :param postData: 请求的post 内容
    :param requestHeaders: 请求头
    """
    for i in range(requestNumCount):
        response = get_url(mainUrl, postData, requestHeaders)

        code = response.status_code

        if code == 200:
            print(response.elapsed.total_seconds())
            timeArr.append(response.elapsed.total_seconds())


# 主要函数
def run(requestNumCount, threadNumCount, mainUrl, postData, requestHeaders):
    """
    :param requestNumCount: 总共执行的线程数(总的请求数=总共执行的线程数*没个线程循环请求的数量)
    :param threadNumCount: 线程池数量
    :param mainUrl: 请求的域名地址
    :param postData: 请求的post 内容
    :param requestHeaders: 请求头
    """
    names = locals()

    for i in range(threadNumCount):
        names['a' + str(i)] = threading.Thread(target=actionsUrl, args=(
            requestNumCount,
            mainUrl,
            postData,
            requestHeaders
        )
                                               )
        names['a' + str(i)].start()

    # 增加一个阻塞,主线程需要等待所有子线程收束后才可以被关闭
    for i in range(threadNumCount):
        names['a' + str(i)].join()


# 获取请求接口所需要的url和数据内容和头部信息
def getSendMsgData(reqToken):
    mainUrl = 'xxxx'

    postData = {
        'to_uid': 222,
        'content': '你好'
    }

    header = getHeaders(reqToken)

    return mainUrl, postData, header


def login():
    mainUrl = 'xxx'

    postData = {
        'mobile': 13800000000,
        'password': 'xxxxxx'
    }

    loginResponse = get_url(mainUrl, postData, getHeaders(None))

    if loginResponse.status_code != 200:
        exit('登录接口重定向,请稍后再试')

    loginData = AES_ECB_Decode(loginResponse.content.decode('utf-8'))
    if loginData['code'] == 100:
        exit(loginData['message'])

    if loginData['token'] is None:
        exit('登录失败,请重试')

    return loginData['token']


# 获取头部
def getHeaders(requestToken):
    requestsHeaders = {
        'client': 'Android',
        'phone_brand': 'ELE-AL00',
        'network': 'wifi',
        'Authorization': 'key '
    }

    if requestToken is not None:
        requestsHeaders['Authorization'] = 'key ' + requestToken

    return requestsHeaders


if __name__ == '__main__':
		# 设定单个线程模拟的请求数量
    requestNum = 10
    
    # 设定线程池线程数量
    threadNum = 10

    token = login()

    url, data, headers = getSendMsgData(token)

    start = time.time()

    run(requestNum, threadNum, url, data, headers)

    end = time.time()

    mean = sum(timeArr) / len(timeArr)
    print('所有线程结束')
    print('=======================================')
    print('请求的接口的地址是:%s' % str(url))
    print('请求接口时使用的线程数:%s' % str(threadNum))
    print('请求接口时单线程执行的任务数量:%s' % str(requestNum))
    print('压测任务完成的时间为%d秒!' % (end - start))
    print('该接口请求的总次数:%s' % str(requestNum * threadNum))
    print('该接口响应成功总次数:%s' % len(timeArr))
    print('该接口响应时间平均值:' + str(mean))
    print('=======================================')

    exit()


输出结果如下:

所有线程结束
=======================================
请求的接口的地址是:xxxx
请求接口时使用的线程数:10
请求接口时单线程执行的任务数量:10
压测任务完成的时间为5秒!
该接口请求的总次数:100
该接口响应成功总次数:100
该接口响应时间平均值:0.48802110000000004
=======================================

害。488ms总得来说的话。嗯还是有点大啊。还需要优化

tag(s): python
show comments · back · home
Edit with Markdown
召唤看板娘