压力测试工具(压力测试工具-locust使用)

locust简介

Locust直译是“蝗虫”的意思,意在压测时产生的压力就像是漫天蝗虫一样,铺天盖地。

Locust是用Python实现的开源性能测试框架,不同于其他压测工具基于进程/线程产生压力,Locust是完全基于事件,支持分布式,一个Locust节点可以在一个进程中轻松支持上千并发用户。

locust特点
  • 基于协程 ,低成本实现更多并发;
  • 脚本增强(“测试即代码”),不支持脚本录制,通过编写代码,提供更大的灵活性;
  • HttpUser类使用了requests发送http请求,FastHttpUser类基于geventhttpclient实现,性能更高;
  • 分布式压测更加简单,可以在同一服务器上使用分布式,也可以在多台服务器上进行分布式压测;
  • 使用Flask 提供WebUI,同时提供无界面压测;
  • 有第三方插件、 易于扩展,只要有相关协议的包,可以随机扩压测压。
locust安装

pip install locust # 查看安装版本 locust -v

locust使用帮助:locust --help

压力测试工具(压力测试工具-locust使用)(1)

测试脚本执行事件顺序

由于许多设置和清除操作是相互依赖的,因此以下是它们的执行顺序:

  1. Locust setup (一次)
  2. TaskSet setup (一次)
  3. TaskSet on_start (每个locust一次)
  4. TaskSet tasks…
  5. TaskSet on_stop (每个locust一次)
  6. TaskSet teardown (一次)
  7. Locust teardown (一次) 通常,setup和teardown方法应该是互补的。
http协议示例
  1. 创建测试用户类,用于继承HttpUser或者FastHttpUser;
  2. 创建测试类,继承TaskSet类;
  3. __init__()方法进行数据初始化;
  4. on_start方法进行测试用例执行前准备,如执行登录;
  5. on_end方法测试结束后执行;
  6. 测试用例编写,待测用例方法前加@locust.task(weight: int=1)装饰器
  7. 使用self.client进行get/post/put/delete等请求

import hashlib import json import logging from urllib import parse from locust import TaskSet, task, between from locust.contrib.fasthttp import FastHttpUser logging.basicConfig(level=logging.INFO, # filename="info.log", filemode="w ", format="%(asctime)s %(filename)s %(funcName)s %(levelname)s Line:%(lineno)s :%(message)s") logger = logging.getLogger(__name__) HOST = "10.209.1.12" PORT = "8081" def get_hash_md5(text): # 返回MD5值 text = text.encode('utf-8') if isinstance(text, str) else text # 转换bytes try: # m = hashlib.md5(b'credit') m = hashlib.md5() m.update(text) return m.hexdigest() except Exception as e: return False def dict_to_params_str(di): s = "" for k, v in di.items(): if isinstance(v, dict): v = str(v) s = "{}={}&".format(k, v) return s[:-1] def get_data_result(data): data = data.decode('utf-8') if isinstance(data, bytes) else data data = json.loads(data) if isinstance(data, str) else data if int(data.get("errno")) == 0: return True return False class UserTask(TaskSet): def __init__(self, parent): super().__init__(parent) self._cookies = "" self._headers = headers def on_start(self): logger.info("测试开始...") headers = { "Accept": "application/json, text/plain, */*", "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "keep-alive", "Cookie": "sidebarStatus=1", "Host": "{}:{}".format(HOST, PORT), "Origin": "http://{}:{}".format(HOST, PORT), "Referer": "http://{}:{}/dist/".format(HOST, PORT), "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36", } data = { "username": "admin", "password": "62638c576e044bffcbe34ff77554d43e"} resp = self.client.post("/login", data=dict_to_params_str(data), name='登录', headers=headers, catch_response=True) if resp.status_code == 200: res = get_data_result(resp.content.decode('utf-8')) if res: self._cookies = resp.headers['Set-Cookie'] logger.info(self._cookies) self._headers["Cookie"] = self._cookies logger.info("登录成功!") else: logger.warning("登录失败:{}".format(resp.content.decode('utf-8'))) return def on_stop(self): logger.info("测试结束!") @task def test_host_getlist_terminal(self): params = { "limit": "10", "page": "1", "prodcombo": "360exthost/pcinfo", "filter": '{"id": "-1"}' } with self.client.get("/host/getlist", name='终端概况', data=dict_to_params_str(params), headers=self._headers, catch_response=True) as resp: if resp.status_code == 200: if get_data_result(resp.content.decode('utf-8')): resp.success() else: resp.failure("resp data error:{}".format(resp.content.decode('utf-8'))) else: resp.failure('request status error:{}'.format(resp.status_code)) class WebsiteUser(FastHttpUser): # HttpUser host = 'http://10.209.1.12:8081' wait_time = between(0, 0.01) # 每个请求间隔/秒 tasks = [UserTask] if __name__ == '__main__': # os.system("locust -f web_api.py --host=http://10.173.220.46:8081 --csv=req --logfile=info.log") os.system("locust -f web_api.py --loglevel=INFO")

Websocket协议示例

class WebSocketClient(InstanceClass): def __init__(self): headers = ["Accept-Encoding:gzip, deflate, br", "Accept-Language:zh-CN,zh;q=0.9", "Cache-Control:no-cache", "Connection:Upgrade", "Host:{}:36060".format(HOST), "Origin:file://", "Pragma:no-cache", "Sec-WebSocket-Extensions:permessage-deflate; client_max_window_bits", "Sec-WebSocket-Key:{}".format(get_websocket_key()), "Sec-WebSocket-Version:13", "Upgrade:websocket", "User-Agent: Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.366", ] self.ws = create_connection(WS_URL, header=headers, sslopt={"cert_reqs": ssl.CERT_NONE}) def on_message(self, message): message = message.decode('utf-8') if isinstance(message, bytes) else message logger.info("接收数据: {}".format(message)) def on_error(self, error): logger.info("错误信息: {}".format(error)) def on_close(self): logger.info("### closed ###") def on_open(self): logger.info("####on openen####") class WebSocketLocust(User): abstract = True def __init__(self, parent): super(WebSocketLocust, self).__init__(parent) self.s_client = WebSocketClient() class UserBehavior(TaskSet): def __init__(self, parent): super().__init__(parent) self.wss = self.parent.s_client.ws def get_m2_info(self): try: param = self.parent.q.get() # self.parent.q.put_nowait(param) # 获取完数据重新加入队列 return param except queue.Empty: logger.error("queue is Empty!!!") exit(0) def on_start(self): logger.info(f"发送AUTH数据...") self.client_auth() logger.info(f"资产上报...") self.test_asset_push() # self.wss.ping() self.wss.pong() threading.Thread(target=self.recv).start() # self.recv() def on_stop(self): logger.info('---------- task stop ------------') self.wss.close() def recv(self): logger.info("进入接收数据方法...") while True: res = self.wss.recv() try: data = json.loads(res) logger.info("接收数据receive:{}".format(data)) except websocket.WebSocketTimeoutException as e: logger.exception("接收数据异常:{}".format(str(e))) except websocket.WebSocketConnectionClosedException as e: logger.warning("websocket close!!!") def assert_result(self, name, result, start_time, msg=""): if result: events.request_success.fire( request_type=name, name=name "_succ", response_time=int(time.time()) - start_time, response_length=0) else: events.request_failure.fire(request_type=name, name=name "_fail", response_time=int(time.time()) - start_time, exception=msg, response_length=1) def sends(self, msg, name): # logger.info("发送类型:{}, 数据类型:{} 数据: {}!".format(name, type(msg), msg)) start_time = int(time.time()) try: self.wss.send(json.dumps(msg)) self.assert_result(name, True, start_time) except Exception as e: logger.error(f"发送错误: {str(e)}") self.assert_result(name, False, start_time, msg=str(e)) def client_auth(self): # 客户端认证 req = {"msgType": 0, "epGuid": self.uuid, "authData": get_sha256(self.uuid ".eppmc")} logger.info("客户端认证,发送AUTH数据: {}".format(req)) self.sends(req, "auth_conn") @task def policy_type_task(self): # 3. 获取策略 POLICY_REQUEST: 101 _policy_type = random.choice(policy_type) name = sys._getframe().f_code.co_name req = {"epGuid": self.uuid, "_os": 1, "msgType": 104, "_guid": self.uuid, "policyHash": get_str_md5(self.uuid), "policyType": "SetSecurityReinforce"} self.sends(req, name=name)

NSQ协议示例

import queue import gnsq from locust import between, task, User, TaskSet, events, tag class InstanceClass(object): _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super(InstanceClass, cls).__new__(cls, *args, **kwargs) return cls._instance class WebNsqclient(InstanceClass): """nsq连接""" def __init__(self): super(WebNsqclient, self).__init__() """建立nsq连接""" self.producer_list = [] self.topic = "CloudSafeLine.EEPP" # self.cwpp_topic = "CloudSafeLine.CCPP" # _nsq_addr = f"{random.choice(nsq_host_list)}:{nsq_port}" self.producer = gnsq.Producer(nsq_addr) self.producer.start() @property def producers(self): return random.choice(self.producer_list) def on_message(self, data): """nsq消息发送,发送成功返回OK""" zip_data = data_compress(data) res = self.producer.publish(self.topic, zip_data) # 报文发送 return res def on_end(self): """nsq连接关闭""" self.producer.close() class WebNsqLocust(User): """自定义locust User类""" abstract = True def __init__(self, parent): super(WebNsqLocust, self).__init__(parent) self.client = WebNsqclient() class UserTask(TaskSet): def __init__(self, parent): super().__init__(parent) monitor_name = "monitor" def get_m2_info(self): try: param = self.parent.q.get() # 参数化 # self.parent.q.put_nowait(param) # 获取完数据重新加入队列 return param except queue.Empty: logger.error("queue is Empty!!!") exit(0) def on_start(self): logger.debug("开始测试...") # threading.Thread(target=monitor_redis).start() global START_FLAG if not START_FLAG: START_FLAG = True threading.Thread(target=monitor_redis).start() # redis监控 monitor_name = "monitor" monitor_time = 5 # 分钟 minutes easynmon_monitor_start(easynmon_ip_port, name=monitor_name, monitor_time=monitor_time) # easynmon监控 def on_end(self): logger.debug("测试结束!!!") self.client.on_end() easynmon_monitor_stop(easynmon_ip_port) def assert_result(self, name, result, start_time): if result.decode() == "OK": events.request_success.fire( request_type=name, name=name "_succ", response_time=int((time.time() - start_time) * 1000), response_length=0) else: events.request_failure.fire(request_type=name, name=name "_fail", response_time=int((time.time() - start_time) * 1000), exception=result, response_length=1) @task @tag("pluginmgr", "test") def pluginmgr_task(self): name = sys._getframe().f_code.co_name param = self.get_m2_info() data = pluginmgr_info(param) start_time = time.time() res = self.client.on_message(data) self.assert_result(name, res, start_time)

locust扩展增强

1、参数化

  • 引入队列的概念 queue ,实现方式是将参数推入队列,测试时依次取出,全部取完后 locust 会自动停止。若是使用参数循环压测,需要将取出的参数再推入队尾。

2、检查点

  • 使用self.client提供的catch_response=True`参数, 添加locust提供的ResponseContextManager类的上下文方法手动设置检查点。
  • ResponseContextManager里面的有两个方法来声明成功和失败,分别是success和failure。其中failure方法需要我们传入一个参数,内容就是失败的原因。

3、思考时间

  • wait_time = between(0, 0.01)
  • wait_time = constant(1)
  • wait_time = constant_pacing/1)

4、权重

  • 类中测试方法执行比例通过@task(int)进行控制,默认1;
  • 一个文件多个类中可以通过weight=1进行控制;

5、集合点:模拟一定数量的用户,同时并发请求。

from locust import HttpUser, TaskSet, task,between,events from gevent._semaphore import Semaphore all_locusts_spawned = Semaphore() # 创建集合点 all_locusts_spawned.acquire() # 阻塞线程 # 注册事件 @events.spawning_complete.add_listener def on_hatch_complete(**kwargs): # Select_task类的钩子方法 # 挂在到locust钩子函数(所有的Locust实例产生完成时触发 all_locusts_spawned.release() num = 0 class UserTask(TaskSet): def login(self): global num num = 1 print("%s个虚拟用户开始启动,并登录"%n) self.client.post("/login", data={"username":"wantest001gwu", "password":"test00123"}) def logout(self): print("退出登录") def on_start(self): self.login() all_locusts_spawned.wait() @task(4) def test1(self): param = {"limit":1, "offset":0} with self.client.get("/list", params=param, headers={}, catch_response = True) as response: print("用户浏览首页商品列表")

6、分布式

locust -f locust.py --master --web-host=0.0.0.0 --loglevel=ERROR

locust -f locust.py --worker --master-host=10.20.14.24 --loglevel=ERROR

7、无界面测试

压力测试工具(压力测试工具-locust使用)(2)

总结

locust不支持录制,使用python进行脚本开发,虽然需要一定编码基础,但是灵活性更大;可以通过代码组合方式实现更多场景!

通过代码增强,测试脚本中日常用到的检查点、参数化、集合、思考时间等都可以实现,而且分布式压测特别方便!

缺点是压测过程中,对服务器的检测需要借助其他工具进行!

压力测试工具(压力测试工具-locust使用)(3)

#学习##python##性能测试#

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页