tencentblueking / best-practices Goto Github PK
View Code? Open in Web Editor NEW腾讯蓝鲸团队 多年的编程最佳实践总结,包括 Python \ Golang 等多个语言及其相关领域
License: Other
腾讯蓝鲸团队 多年的编程最佳实践总结,包括 Python \ Golang 等多个语言及其相关领域
License: Other
// BAD
c := make(chan int, 100)
// GOOD
c := make(chan int)
// BAD
if _, err := openFile("/path") {
// do something
}
// GOOD
var err error
if _, err = openFile("/path") {
// do something
}
// GOOD
var (
ok bool
)
if _, ok = <- ch; !ok {
// do something when channel is closed.
}
函数作为对象定义的时候就被执行,默认参数是函数的属性,它的值可能会随着函数被调用而改变。
# BAD
def foo(li=[]):
li.append(1)
print(li)
# GOOD
def foo(li=None):
if li is None:
li = []
li.append(1)
print(li)
调用两次foo函数后,不同的输出结果:
# BAD
[1]
[1,1]
# GOOD
[1]
[1]
select_related只针对有外键字段的表操作,并且相关对象只能包含在选择中(一对一的关系)。
举个例子:假定有User实体类,有information外键。information实体类中含字段address,那么当把user_list获取出来再分别通过information外键的address读出来。其中,每次获取一个对象的address,就要向数据库发送一次请求。而select_related()的作用就是可以把外键的对应的对象都在第一次user_list查询的时候,一起读出来,就没有后面的向数据库发送请求,直接调用即可。
# BAD
user_list = User.objects.all()
# GOOD
user_list = User.objects.select_related().all()
# GOOD
user_list = User.objects.all().select_related('information')
对变量和函数的参数返回值类型做注解,有助于让调用方减少类型方面的错误。
# BAD
def greeting(name):
return 'Hello ' + name
# GOOD
def greeting(name: str) -> str:
return 'Hello ' + name
asyncio.sleep()
代替 time.sleep()
time.sleep()
是阻塞的,协程执行到此会导致整体事件循环卡住
asyncio.sleep()
非阻塞,事件循环将运行其他逻辑
import time
import asyncio
# BAD
async def execute_task(task_id: int):
print(f"task[{task_id}] hello")
time.sleep(1)
print(f"task[{task_id}] world")
# GOOD
async def execute_task(task_id: int):
print(f"task[{task_id}] hello")
await asyncio.sleep(1)
print(f"task[{task_id}] world")
上述例子将通过以下代码执行:
import asyncio
async def main():
await asyncio.gather(task(1), task(2))
await main()
BAD
将输出以下内容,task[1]
执行完 hello
后被 time.sleep()
阻塞
task[1] hello
task[1] world
task[2] hello
task[2] world
GOOD
将输出以下内容,task[1]
执行完 hello
后,await asyncio.sleep(1)
将挂起 task[1]
,开始执行 task[2]
task[1] hello
task[2] hello
task[1] world
task[2] world
在判断结果前加否,代码可读性变差,让人的理解成本增加,后续维护也不方便
# BAD
if not validated_data["data_type"] == "cleaned":
kafka_config = data_id_info["mq_config"]
else:
kafka_config = data_id_info["result_table_list"][0]["shipper_list"][0]
# GOOD
if validated_data["data_type"] == "cleaned":
kafka_config = data_id_info["result_table_list"][0]["shipper_list"][0]
else:
kafka_config = data_id_info["mq_config"]
# BAD
## 每次都执行commit,整体耗时较长(大约25s左右)
for num in range(10000):
Record.objects.create(num=num)
# GOOD
## 统一提交数据库,耗时很短(1s以内)
inserted_list = []
for num in range(10000):
inserted_list.append(Demo(num=num))
Record.objects.bulk_create(inserted_list)
注意: bulk_create 方法只执行一次数据库交互,这样相当于创建时间一样,并且自定字段不会在返回数据中
有时我们常会把 yield
类同于 return
,为了减少一些循环次数,我们常会把 return
直接替换成 yield
,然而这时候就很容易出现 bug,尤其是当一个函数中有多个条件分支时。
# BAD
def test(a: int):
if a > 1:
yield "a"
yield "b"
list(test(2)) # 预期是 ["a"], 实际是 ["a", "b"],因为 yield 仅是让度 CPU 而非结束当前函数
# GOOD
def test(a: int):
if a > 1:
yield "a"
return # 控制好函数的生命周期,以达到预期效果
yield "b"
timedelta.total_seconds()
代替 timedelta.seconds
获取相差总秒数from datetime import datetime
dt1 = datetime.now()
dt2 = datetime.now()
# BAD
print((dt2 - dt1).seconds)
# GOOD
print((dt2 - dt1).total_seconds())
在源码中,seconds 的计算方式为:days, seconds = divmod(seconds, 24*3600)
表达式右侧 seconds 是总秒数,被一天的总秒数取模得到 seconds
@property
def seconds(self):
"""seconds"""
return self._seconds
# in the `__new__`, you can find the `seconds` is modulo by the total number of seconds in a day
def __new__(cls, days=0, seconds=0, microseconds=0,
milliseconds=0, minutes=0, hours=0, weeks=0):
seconds += minutes*60 + hours*3600
# ...
if isinstance(microseconds, float):
microseconds = round(microseconds + usdouble)
seconds, microseconds = divmod(microseconds, 1000000)
# ! 👇
days, seconds = divmod(seconds, 24*3600)
d += days
s += seconds
else:
microseconds = int(microseconds)
seconds, microseconds = divmod(microseconds, 1000000)
# ! 👇
days, seconds = divmod(seconds, 24*3600)
d += days
s += seconds
microseconds = round(microseconds + usdouble)
# ...
total_seconds
可以得到一个准确的差值:
def total_seconds(self):
"""Total seconds in the duration."""
return ((self.days * 86400 + self.seconds) * 10**6 +
self.microseconds) / 10**6
很多场景下,我们会得到比较大的原始数据(比如数万个嵌套的 dict
),为了更便利地操作这些数据,往往会选择通过 class
进行实例化,但基于 Python 孱弱的 CPU 计算性能,这一操作可能会耗时过于久。
所以需要在两方面做平衡:
NamedTuple
类似的结构,获得一定的结构便利性,但相较于原始数据,会牺牲一定性能dataclass
或者 class
等方式,保证最大的结构便利性,但会非常影响性能这个观点需要更多的例子测试来佐证,有好的想法和数据的欢迎讨论 😃
update_fields
如果要对数据库字段进行更新,使用 update_fields
避免并行 save()
产生数据冲突
# BAD
foo_instance.bar_field = other_value
foo_instance.save()
# GOOD
foo_instance.bar_field = other_value
foo_instance.save(update_fields=["bar_field"])
同时需要注意的是,如果 Model
中包含 auto_now
字段时,需要在 update_fields
的列表中添加该字段,保证同时更新。
# BAD
# 有注入风险, username 不会被转义,可以直接注入
Entry.objects.extra(where=[f"headline='{username}'"])
# GOOD
# 安全,Django 会将 username 内容转义
Entry.objects.extra(where=['headline=%s'], params=[username])
类似 PEP 规则,最佳实践可以有一个标号 (例如 PBP?)
例子:
不要在代码中出现 Magic Number,常量应该使用 Enum 模块来替代。
# BAD
if cluster_type == 1:
pass
# GOOD
from enum import Enum
Class BCSType(Enum):
K8S = 1
Mesos = 2
if cluster_type == BCSType.K8S.value:
pass
好处是,每一个建议都可以有一个简单的索引方式,在 code review 中可以通过 PBP-X
的方式向其他人建议,更容易传播和记忆。
DRF 在 3.10 版本以前,ModelSerializer
有较大的性能问题,用作渲染大量的数据返回可能会耗时非常久,可以考虑使用 Serializer
或者原生数据结构返回。
# 当有大量 user 对象需要渲染时
# BAD
class UserModelSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = "__all__"
# GOOD
# 性能有所提升,同时又不会破坏 Serializer 结构
class UserSerializer(serializers.Serializer):
# 将需要的字段平铺出来
username = serializers.CharField()
...
# GOOD
# 最快!直接返回原始结构体,但是可能需要处理多种对象
def serialize_user(user: User) -> Dict[str, Any]:
...
return {
"username": "foo",
...
}
在 DRF 3.10 版本以后,ModelSerializer
性能有一定程度的提升,但依旧会比后两种处理慢,可以根据场景和具体测试数据选择适合的写法。
参考:
# BAD
list_two = []
for v in list_one:
if v[0]:
new_list.append(v[1])
# GOOD one
list_two = [v[1] for v in list_one if v[0]]
# GOOD two
list_two = list(filter(lambda x: x[0], list_one))
如果想要转换一个时间,可以将任意对象扔给arrow,然后转成对应的函数格式
datetime.datetime.now()
),区分时区int(time.time())
),不区分时区2022-11-08 22:57:22
)(str(datetime.datetime.now())
),区分时区这些对象统一都可以扔给arrow.get(任意对象),得到一个arrow对象arr
转成datetime对象: arr.
# 带时区
In [18]: arr.datetime
Out[18]: datetime.datetime(2022, 11, 8, 22, 57, 22, 171057, tzinfo=tzlocal())
# 不带时区
In [19]: arr.naive
Out[19]: datetime.datetime(2022, 11, 8, 22, 57, 22, 171057)
转成timestamp整数:
In [20]: arr.timestamp
Out[20]: 1667919442
转成字符串:
In [21]: arr.strftime("%Y-%m-%d %H:%M:%S")
Out[21]: '2022-11-08 22:57:22'
# BAD
In [22]: int(time.mktime(time.strptime('2022-11-08 22:57:22+0800', '%Y-%m-%d %H:%M:%S%z')))
Out[22]: 1667919442
# GOOD
In [23]: arrow.get('2022-11-08 22:57:22+0800').timestamp
Out[23]: 1667919442
无须自己指定时间格式,直接转换即可
当 MySQL 版本较低时,DATETIME 类型默认是不支持 milliseconds 的,当批量创建对象时,会导致大量记录的 auto_now_add
字段都在同一秒,此时根据该字段是无法获得稳定的排序结果的。
# BAD
class Foo(models.Model):
...
foo = models.DateTimeField(auto_now_add=True)
...
class Meta:
ordering = ["foo"]
# GOOD
class Foo(models.Model):
...
foo = models.DateTimeField(auto_now_add=True)
...
class Meta:
# 使用自增 ID 或者其他能准确表明顺序的字段
ordering = ["id"]
参考:
Django中的QuerySet都是带有缓存的。一旦要计算 QuerySet 的值,就会执行数据查询,随后,Django 就会将查询结果保存在 QuerySet 的缓存中,并返回这些显式请求的缓存。
因此我们可以通过Django的缓存减少对数据库查询的访问次数
# BAD
grade_list = [student.grade for student in Student.objects.all()]
name_list = [student.name for student in Student.objects.all()]
# GOOD
student_queryset = Student.objects.all()
grade_list = [student.grade for student in student_queryset]
name_list = [student.name for student in student_queryset]
Django(>=4.0)的ORM已经支持了bulk_update
tasks = [
Task.objects.create(name='task1', status='start', cost=1),
Task.objects.create(name='task2', status='start', cost=1),
...
]
# BAD
for task in tasks:
task.name = f'{task.pk}-{task.name}'
task.save()
# GOOD
for task in tasks:
task.name = f'{task.pk}-{task.name}'
Task.objects.bulk_update(tasks, ['name'])
如果使用update_or_create时,一定要将更新字段放置于defaults里,因为会根据非defaults里的字段作为查询条件
# BAD
ModelA.objects.update_or_create(
field_1="field_1",
field_2="field_2",
field_3="field_3",
)
# GOOD
ModelA.objects.update_or_create(
field_1="field_1",
defaults={
"field_2": "field_2",
"field_3": "field_3",
}
如果对 Redis 的同一个 key
有多次操作并且希望保证操作的原子性,除了加锁的复杂操作外,可以通过将多条 Redis 操作指令封装为 lua 脚本,再通过执行 lua 脚本的方式实现。
业务场景:并发场景下,检查 "best-practices:identifier" 的值是否为 abc
,是的话删除。
# BAD
if redis_client.get("best-practices:identifier") == "abc":
return redis_client.del("best-practices:identifier")
上述实现的问题:并发场景下,best-practices:identifier
对应的值可能被修改,如果修改是在 get
del
操作间隙发生,那么会导致值不为 abc
的 best-practices:identifier
被误删。
通过 lua 脚本,可以将 get
del
封装成原子性操作,避免上述问题的发生。
# GOOD
# lua 脚本:满足期望值将 key 删除,否则返回 0
del_script = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
del_script_func = redis_client.register_script(del_script)
return del_script_func(keys=["best-practices:identifier"], args=["abc"])
get_or_create
方法时需要保证查询字段唯一性约束在并发请求的情况下,get_or_create
并不能保证记录的唯一性,会存在重复创建的情况。因此使用此方法前需要确定用于存在性查询的字段是否设置了DB级别的唯一性约束
# BAD
# models.py
class Topic(models.Model):
"""
模型定义
"""
username = models.CharField(max_length=32)
title = models.CharField(max_length=128)
# views.py
def view_func(request):
# 并发请求场景下可能会出现重复记录
Topic.objects.get_or_create(username="foo", title="bar")
# GOOD
# models.py
class Topic(models.Model):
"""
模型定义
"""
username = models.CharField(max_length=32)
title = models.CharField(max_length=128)
class Meta:
# 增加 username 和 title 字段联合唯一性约束
unique_together = ("username", "title")
# views.py
def view_func(request):
# 存在DB级别的唯一性约束,能够保证不会创建重复记录
Topic.objects.get_or_create(username="foo", title="bar")
null=True
而不是 default
# BAD
new_field = models.CharField(default="foo")
# GOOD
new_field = models.CharField(null=True)
前者将会在 migrate
操作时对已存在的数据批量刷新,对现有数据库带来不必要的影响。
参考:https://pankrat.github.io/2015/django-migrations-without-downtimes/
示例1:fruits_list = Fruits.objects.all()
if fruits_list:
print(fruits.name)
示例2:fruits_list = Fruits.objects.values('id', 'name')
if fruits_list:
print(fruits.name)
总结:虽然都是拿到了name,但是第一种示例会从数据库中提取所有字段,耗时耗力,那么第二种示例只需要拿我们想要的数据,节省了查询时间和内存。
在对redis的高频操作中,由于RTT的存在。单点对redis的qps很大程度上受到RTT的限制。当ping响应在1ms时,单点qps最大也不会超过1k/s。
# BAD
for item in item_list:
client.lpush(key, item)
# GOOD (节省了RTT)
pipeline = client.pipeline()
for item in item_list:
pipeline.lpush(key, item)
pipeline.execute()
# BETTER(redis2.4及更高版本)
client.lpush(key, *item_list)
assert
时,请添加断言信息,避免在排查问题时缺少关键信息>>> assert 1 == 0
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
AssertionError
在大段的日志中,单独存在的 AssertionError
不利于日志检索和问题定位,所以添加上可读的断言信息是更推荐的做法。
# BAD
assert "hello" == "world"
# GOOD
assert "hello" == "world", "Hello is not equal to world"
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.