Traceback (most recent call last):
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado_mysql\connections.py", line 833, in _read_packet
packet_header = yield self._stream.read_bytes(4)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\iostream.py", line 418, in read_bytes
self._try_inline_read()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\iostream.py", line 857, in _try_inline_read
self._check_closed()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\iostream.py", line 1058, in _check_closed
raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "E:\work\projects\interview\service\user\handlers\user_service\views.py", line 437, in post
cur = yield self.db.execute(sql, [reward_id, self.cache_user_id])
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1099, in run
value = future.result()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado_mysql\pools.py", line 124, in execute
yield cur.execute(query, params)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1099, in run
value = future.result()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado_mysql\cursors.py", line 132, in execute
yield self._query(query)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1099, in run
value = future.result()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado_mysql\cursors.py", line 283, in _query
yield conn.query(q)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1099, in run
value = future.result()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado_mysql\connections.py", line 731, in query
yield self._read_query_result(unbuffered=unbuffered)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1099, in run
value = future.result()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado_mysql\connections.py", line 865, in _read_query_result
yield result.read()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1099, in run
value = future.result()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado_mysql\connections.py", line 1062, in read
first_packet = yield self.connection._read_packet()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 1099, in run
value = future.result()
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado\gen.py", line 315, in wrapper
yielded = next(result)
File "E:\work\projects\interview\service\user\venv\lib\site-packages\tornado_mysql\connections.py", line 845, in _read_packet
raise OperationalError(2006, "MySQL server has gone away (%s)" % (e,))
tornado_mysql.err.OperationalError: (2006, 'MySQL server has gone away (Stream is closed)')
class BaseHandlerTestCase(WebTestCase):
"""
构建测试本项目的测试案例
1. 将所有的handler进行处理
2. 初始化数据库
3. 初始化redis环境
4. 结束时,销毁数据库以及redis环境
"""
def set_db_info(self):
"""
测试案例,默认创建数据库的参数
可根据自身需求,重写方法或者定制
:return:
"""
settings.MYSQL_DB_DBNAME = "testing"
def set_redis_info(self):
"""
测试案例,默认创建redis的参数
可根据自身需求,重写方法或者定制
:return:
"""
settings.REDIS_DB = 8
def get_app(self):
"""
1. 读取系统的App
2. load所有的Handlers
:return:
"""
app = MyApplication()
return app
def create_db(self):
"""
创建数据库 database
:return:
"""
with pymysql.connect(**dict(host=settings.MYSQL_DB_HOST,
port=settings.MYSQL_DB_PORT,
user=settings.MYSQL_DB_USER,
passwd=settings.MYSQL_DB_PASSWORD,
charset="utf8mb4"
)) as cursor:
# 警告不提示
cursor._defer_warnings = True
sql = """
DROP DATABASE IF EXISTS `{}`;
""".format(settings.MYSQL_DB_DBNAME)
cursor.execute(sql)
sql = """
CREATE DATABASE IF NOT EXISTS {} DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci;
""".format(settings.MYSQL_DB_DBNAME)
cursor.execute(sql)
def create_tables(self):
"""
创建数据库表
:return:
"""
with pymysql.connect(**dict(host=settings.MYSQL_DB_HOST,
port=settings.MYSQL_DB_PORT,
user=settings.MYSQL_DB_USER,
passwd=settings.MYSQL_DB_PASSWORD,
database=settings.MYSQL_DB_DBNAME,
charset="utf8mb4"
)) as cursor:
cursor._defer_warnings = True
file_path = os.path.join(settings.BASE_PATH, 'doc', 'user.sql')
with open(file_path, 'r', encoding='utf-8') as f:
sql = "".join(f.readlines()).replace("\n", "")
sqls = sql.split('--')
for sql_ in sqls:
if sql_.strip():
cursor.execute(sql_)
def clear_db(self):
"""
删除数据库
:return:
"""
with pymysql.connect(**dict(host=settings.MYSQL_DB_HOST,
port=settings.MYSQL_DB_PORT,
user=settings.MYSQL_DB_USER,
passwd=settings.MYSQL_DB_PASSWORD,
charset="utf8mb4"
)) as cursor:
sql = "DROP DATABASE IF EXISTS `{}`;".format(settings.MYSQL_DB_DBNAME)
cursor._defer_warnings = True
cursor.execute(sql)
def clear_redis(self):
"""
清除redis数据
:return:
"""
rc = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
password=settings.REDIS_PASS)
rc.flushdb()
def setUp(self):
self.set_db_info()
self.set_redis_info()
self.create_db()
self.create_tables()
super(BaseHandlerTestCase, self).setUp()
def tearDown(self):
super(BaseHandlerTestCase, self).tearDown()
self.clear_redis()
self.clear_db()
@property
def db(self):
return pymysql.connect(**dict(host=settings.MYSQL_DB_HOST,
port=settings.MYSQL_DB_PORT,
user=settings.MYSQL_DB_USER,
passwd=settings.MYSQL_DB_PASSWORD,
database=settings.MYSQL_DB_DBNAME,
charset="utf8mb4"
))
@property
def redis(self):
return self._app.redis.get_client()
class RecieveRewardTest(BaseHandlerTestCase):
def init_req_data(self):
self.user_id = 12345
access_token = "TEST"
data = {
"user_id": self.user_id
}
self.redis.set("a:{}".format(access_token), json_encode(data))
headers = {
"Content-Type": "application/json",
"X-Access-Token": access_token
}
return access_token, headers
def execute_sql(self, sql, *args):
try:
with self.db as cursor:
rowcount = cursor.execute(sql, args)
return rowcount, cursor.lastrowid
except:
traceback.print_exc()
finally:
self.db.close()
@gen_test
def query_sql(self):
pass
# 参数不正确
def test_param_error(self):
req_data = {
"reward_id": ""
}
access_token, headers = self.init_req_data()
resp = self.fetch("/user_service/receive_reward", method="POST", body=json_encode(req_data), headers=headers)
res = json_decode(resp.body)
self.assertEqual(resp.code, 200)
self.assertEqual(res['code'], 400)
self.assertEqual(res['message'], '参数不正确')
# 正在领取
def test_process_reward(self):
req_data = {
"reward_id": "123456"
}
access_token, headers = self.init_req_data()
# 构造正在处理的数据
key = "RecieveReward:%s" % req_data['reward_id']
self.redis.set(key, 1)
# 准备测试
resp = self.fetch("/user_service/receive_reward", method="POST", body=json_encode(req_data), headers=headers)
res = json_decode(resp.body)
self.assertEqual(resp.code, 200)
self.assertEqual(res['code'], 400)
self.assertEqual(res['message'], '奖励在其他处已经领取,请重新查询')
self.assertIsNotNone(self.redis.get(key))
# 数据库中指定的领取奖励信息不存在
def test_not_found(self):
req_data = {
"reward_id": "123456"
}
access_token, headers = self.init_req_data()
# 构造正在处理的数据
key = "RecieveReward:%s" % req_data['reward_id']
# 准备测试
resp = self.fetch("/user_service/receive_reward", method="POST", body=json_encode(req_data), headers=headers)
res = json_decode(resp.body)
self.assertEqual(resp.code, 200)
self.assertEqual(res['code'], 400)
self.assertEqual(res['message'], '奖励信息不存在')
self.assertIsNone(self.redis.get(key))
# 数据库中是否可领取状态
def test_is_interview_status_error(self):
access_token, headers = self.init_req_data()
sql = """
insert into client_user_share_award_info(
user_id, position_stage_id, name, school, job, short_job, position,
money, interview_status, is_interview_status)
value(%s, 1, 'test', 'test', '公司名字', '公司简称', 1, 12.5, 5, 0)
"""
rowcount, lastrowid = self.execute_sql(sql, self.user_id)
req_data = {
"reward_id": lastrowid
}
# 构造正在处理的数据
key = "RecieveReward:%s" % req_data['reward_id']
# 准备测试
resp = self.fetch("/user_service/receive_reward", method="POST", body=json_encode(req_data),
headers=headers)
res = json_decode(resp.body)
self.assertEqual(resp.code, 200)
self.assertEqual(res['code'], 400)
self.assertEqual(res['message'], '奖励非可领取状态,请刷新重试')
self.assertIsNone(self.redis.get(key))
# 数据库中是否可领取状态
def test_interview_status_error(self):
access_token, headers = self.init_req_data()
sql = """
insert into client_user_share_award_info(
user_id, position_stage_id, name, school, job, short_job, position,
money, interview_status, is_interview_status)
value(%s, 1, 'test', 'test', '公司名字', '公司简称', 1, 12.5, 4, 3)
"""
rowcount, lastrowid = self.execute_sql(sql, self.user_id)
req_data = {
"reward_id": lastrowid
}
# 构造正在处理的数据
key = "RecieveReward:%s" % req_data['reward_id']
# 准备测试
resp = self.fetch("/user_service/receive_reward", method="POST", body=json_encode(req_data),
headers=headers)
res = json_decode(resp.body)
self.assertEqual(resp.code, 200)
self.assertEqual(res['code'], 400)
self.assertEqual(res['message'], '奖励非可领取状态,请刷新重试')
self.assertIsNone(self.redis.get(key))
A single test case can be executed alone, but the execution of a group is wrong.