DubboClient:是一个基于telnetlib类库封装的,用于测试dubbo接口的客户端工具类
说明:
• telnetlib 是python内置的模块,用来创建Telnet协议的连接
• telnetlib 模块提供一个实现telnet协议的类 Telnet,Dubboclient,封装了 telnetlib 库。
安装dubboclient
pip install dubboclient
检查是否已经安装:pip list 或 pip show dubboclient
实用案例
案例一
根据手机号,查询会员信息(传递普通参数)
实现代码:
# 1. 导包 from dubboclient import DubboClient
from dubboclient import DubboClient
# 2. 创建 DubboClient类实例,指定 IP 和 port
dubboclt = DubboClient("211.103.136.244", 6502)
# 3. 使用 实例调用 invoke() 方法。 传入 :服务名、方法名、实参(方法使用)。获取响应结果
resp = dubboclt.invoke("MemberService", "findByTelephone", "13020210001")
# 4. 打印响应结果
print("响应结果 =", resp)
print("type(resp) =", type(resp))
案例二
添加会员(传递 对象参数)
参数类型为自定义类型时,传送参数时要增加键值对:class:完整包名和类名
实现代码:
# 1. 导包
from dubboclient import DubboClient
# 2. 创建 dubboclient 实例
dubboclt = DubboClient("211.103.136.244", 6502)
# 准备 add 方法,所需要的数据
info = {"id": 911, "name": "杜甫", "phoneNumber": "13048379884"}
# 如果 class 已经存在,覆盖原有class值; 如果不存在 class,新增一组 元素到 字典中。
info["class"] = "com.itheima.pojo.Member"
# 3. 调用 invoke 传 服务名、方法名、实参。得响应结果
resp = dubboclt.invoke("MemberService", "add", info)
# 4. 打印
print("响应结果 =", resp)
print("type(resp) =", type(resp))
案例三
添加预约设置
实现代码:
# 1. 导包 from dubboclient import DubboClient
# 2. 创建 dubboclient 实例
dubboclt = DubboClient("211.103.136.244", 6502)
# 准备 add 方法,所需要的数据
info = [{"orderDate": "2021-05-18 18:89:02", "number": 346}]
# 3. 调用 invoke 传 服务名、方法名、实参。得响应结果
resp = dubboclt.invoke("OrderSettingService", "add", info)
# 4. 打印
print("响应结果 =", resp)
print("type(resp) =", type(resp))
Dubbo框架封装
看了上面的代码,大家可能都发觉远程调用的这几个dubbo接口 存在的问题:
1. 代码有 大量冗余
2. 测试接口时,除了要给 测试数据之外, 还需要 指定 服务名、方法名
3. 传参时,除了要考虑测试数据外,还要分析是否要添加 class 字段 及 对应数据。
4. 返回的数据类型统一为 string(不具体)
基础服务对象
# api/base_service.py
from dubboclient import DubboClient
class BaseService:
"""基础服务"""
def __init__(self):
self.dubbo_client = DubboClient("211.103.136.244", 6502)
服务对象
class MemberService(BaseService):
"""会员服务接口"""
def __init__(self):
super().__init__()
self.service_name = "MemberService"
self.find_by_telephone_method = "findByTelephone"
def find_by_telephone(self, telephone):
return self.dubbo_client.invoke(self.service_name, self.find_by_telephone_method, telephone)
测试用例对象
class TestMemberService(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
cls.member_service = MemberService()
def test01_find_by_telephone_success(self):
"""查询成功"""
telephone = "13520196139"
response = self.member_service.find_by_telephone(telephone)
print("response===", response)
json_data = json.loads(response)
self.assertEqual(telephone, json_data.get("phoneNumber"))
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END