Flask入门三(Flask-session的使用、数据库链接池、wtforms、Flask定制命令、Flask-Cache)
文章目录
- 一、Flask-session使用
- 1.使用方式一
- 2.使用方式二
- 3.读RedisSessionInterface源码
- 4.flask-session补充
- 二、数据库连接池
- 1.flask中使用mysql
- 2.上述问题解决
- 使用数据库连接池
- 1.第三方数据库连接池
- 2.操作数据库不带池版
- 3.池版和非池版压测
- 三、wtforms
- 四、Flask定制命令
- 1.使用 flask-script定制命令(老版本,新版本不用了)
- 2.新版本定制命令
- 3.Django中自定制命令
- 五、Flask-Cache
一、Flask-session使用
Flask内���的Session会把数据加密后保存到浏览器 我们自己重写Session类保存到Reids中只需要重写open_session和save_session方法
而在这中间有一个模块就做了这件事,那就是flask-session,以把数据存放到文件、redis、mongodb、关系型数据库等中
安装flask-session
pip install flask-session
1.使用方式一
from flask import Flask,session app = Flask(__name__) app.debug=True app.secret_key='jlkdoasfiuz' # 只要使用session就需要secret_key 1.安装flask-session pip install flask-session '使用方式一' 2.导入(这里我写入到redis缓存数据库中) from flask_session import RedisSessionInterface 3.把app.session_interface替换成RedisSessionInterface的对象 # 替换了就会走RedisSessionInterface的open_session和save_session from redis import Redis conn=Redis(host='127.0.0.1',port=6379,db=2) # 需要传入的参数 redis, key_prefix, use_signer, permanent, sid_length ''' 1.redis 是传入链接的redis库,链接对象 2.key_prefix 是保存在redis中名称的前缀 3.use_signer 是如果是False就无需配置secret_key,默认设置True 4.permanent 是关闭浏览器cookie是否失效 5.sid_length 是生成session_key的长度,会以cookie形式写入到浏览器cookie中,但是去掉redis中session这个前缀 去掉前缀就是session_key的长度限制 ''' app.session_interface=RedisSessionInterface( redis=conn,key_prefix='session',use_signer=False, permanent=True,sid_length=32 ) @app.route('/set_session') def set_session(): session['name'] = 'jack' return 'set_session' @app.route('/get_session') def get_session(): print(session.get('name')) return 'get_session' if __name__ == '__main__': app.run()
2.使用方式二
from flask import Flask,session app = Flask(__name__) app.debug=True app.secret_key='jlkdoasfiuz' # 只要使用session就需要secret_key 1.安装flask-session pip install flask-session '使用方式二' 2.在flask配置文件中加入配置 from redis import Redis # 导入redis app.config['SESSION_TYPE'] = 'redis' # 配置链接的类型 app.config['SESSION_REDIS']=Redis(host='127.0.0.1',port=6379,db=2) # app.config['SESSION_KEY_PREFIX'] = 'session' # 如果不写,默认以SESSION_COOKIE_NAME作为key # app.config.from_pyfile('./settings') # 第二种导入配置文件方式 3.导入Session from flask_session import Session Session(app) # 核心和方式一一模一样,具体看源码 @app.route('/set_session') def set_session(): session['name'] = 'jack' return 'set_session' @app.route('/get_session') def get_session(): print(session.get('name')) return 'get_session' if __name__ == '__main__': app.run()
3.读RedisSessionInterface源码
1.RedisSessionInterface的open_session(在它的父类中) def open_session(self, app, request): # -取到前端传入,在cookie中得随机字符串 sid = request.cookies.get(app.config["SESSION_COOKIE_NAME"]) if not sid: sid = self._generate_sid(self.sid_length) # 当sid不为空,把sid传入到session_class得到对象 return self.session_class(sid=sid, permanent=self.permanent) if self.use_signer: # 用来加密,所以第一种方式的ues_signer最好不要改为False try: sid = self._unsign(app, sid) except BadSignature: sid = self._generate_sid(self.sid_length) return self.session_class(sid=sid, permanent=self.permanent) return self.fetch_session(sid) def fetch_session(self, sid): # 取到随机字符串 prefixed_session_id = self.key_prefix + sid # 从redis中取出key为前缀+随机字符串对应的value值 value = self.redis.get(prefixed_session_id) if value is not None: try: # 解密成字符串 session_data = self.serializer.loads(value) # 把解密后的数据,组装到 session对象中 return self.session_class(session_data, sid=sid) except pickle.UnpicklingError: return self.session_class(sid=sid, permanent=self.permanent) return self.session_class(sid=sid, permanent=self.permanent) 2.RedisSessionInterface的save_session(在它自己内) def save_session(self, app, session, response): if not self.should_set_cookie(app, session): return domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) if not session: # 如果session有值 if session.modified: # 如果值被修改过,就把cookie中的删除,并且删除redis中的 self.redis.delete(self.key_prefix + session.sid) response.delete_cookie( app.config["SESSION_COOKIE_NAME"], domain=domain, path=path ) return # expiration_datetime = self.get_expiration_time(app, session) serialized_session_data = self.serializer.dumps(dict(session)) # 放到redis中 self.redis.set( name=self.key_prefix + session.sid, value=serialized_session_data, ex=total_seconds(app.permanent_session_lifetime), # 过期时间 ) # 把session对应的随机字符串放到cookie中 self.set_cookie_to_response(app, session, response, expiration_datetime)
4.flask-session补充
- session的前缀如果不传,默认:config.setdefault('SESSION_KEY_PREFIX', 'session:') - session过期时间:通过配置,如果不写,会有默认 'PERMANENT_SESSION_LIFETIME': timedelta(days=31),#这个配置文件控制 -设置cookie时,如何设定关闭浏览器则cookie失效 permanent=False app.config['SESSION_PERMANENT'] = False
二、数据库连接池
1.flask中使用mysql
'settings.py' SECRET_KEY = 'fdsjakluiz' DEBUG = True MYSQL_USER = 'root' MYSQL_HOST = '127.0.0.1' MYSQL_PORT = 3306 MYSQL_PASSWORD = '1234' MYSQL_DATABASE = 'cnblogs' JSON_AS_ASCII = False 'app.py' import pymysql.cursors from flask import Flask, jsonify app = Flask(__name__) app.config.from_pyfile('./settings.py') # pymysql操作mysql from pymysql import Connect conn = Connect( user=app.config.get('MYSQL_USER'), password=app.config.get('MYSQL_PASSWORD'), host=app.config.get('MYSQL_HOST'), database=app.config.get('MYSQL_DATABASE'), port=app.config.get('MYSQL_PORT'), ) # pymysql.cursors.DictCursor查出来的是列表套字典的形式 cursor = conn.cursor(pymysql.cursors.DictCursor) # cursor = conn.cursor() # app.config['JSON_AS_ASCII'] = False # 前端显示json格式中文 @app.route('/') def articles(): cursor.execute('select id,title,author from article limit 10') article_list = cursor.fetchall() # 拿出所有 return jsonify(article_list) if __name__ == '__main__': app.run()
这种方式conn和cursor如果是全局,出现如下问题:
'上面的 conn和cursor 都是全局的' 假设极端情况:同时并发两个用户 -一个用户查询所有文章 -一个用户查询所有用户 '在线程中全局变量是共享的' 就会出现,第一个线程拿着cursor执行了: cursor.execute('select id,title,author from article limit 10') 然后第二个线程拿着 cursor 执行了: cursor.execute('select id,name from user limit 10') 第一个线程开始执行:(用的全是同一个cursor) article_list = cursor.fetchall() 就会出现查询article的cursor取出来的数据是 用户相关数据---》出现数据错乱
2.上述问题解决
每个人:线程用自己的从conn和cursor,在视图函数中,拿到链接和cursor
import pymysql.cursors from flask import Flask, jsonify app = Flask(__name__) app.config.from_pyfile('./settings.py') # pymysql操作mysql from pymysql import Connect @app.route('/') def articles(): conn = Connect( user=app.config.get('MYSQL_USER'), password=app.config.get('MYSQL_PASSWORD'), host=app.config.get('MYSQL_HOST'), database=app.config.get('MYSQL_DATABASE'), port=app.config.get('MYSQL_PORT'), ) cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute('select id,title,author from article limit 10') article_list = cursor.fetchall() # 拿出所有 return jsonify(article_list) @app.route('/desc') def desc(): conn = Connect( user=app.config.get('MYSQL_USER'), password=app.config.get('MYSQL_PASSWORD'), host=app.config.get('MYSQL_HOST'), database=app.config.get('MYSQL_DATABASE'), port=app.config.get('MYSQL_PORT'), ) cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute('select id,real_desc from article limit 10') article_list = cursor.fetchall() # 拿出所有 return jsonify(article_list) if __name__ == '__main__': app.run()
但是这种如果并发量过高,就会出现连接数过多的问题,mysql的性能就降低了
使用数据库连接池
'上述操作存在的问题' 1.原生pymysql操作,最好有一个rom---->sqlalchemy 2.并发问题:conn和cursor要做成单例,还是每个视图函数一个? -如果使用单例,数据会错乱 -咱们需要,每个视图函数,哪一个链接,如果并发数过多,mysql链接数就会很多,所以使用连接池解决 # django orm操作,一个请求,就会拿到一个mysql的链接,用完后就释放 '所以想要彻底解决,得使用数据库连接池' -限定 mysql链接最多,无论多少线程操作,都是从池中取链接使用 '解决上面的两个问题' -数据库连接池 -创建一个全局的池 -每次进入视图函数,从池中取一个连接使用,使用完放回到池中,只要控制池的大小,就能控制mysql连接数
1.第三方数据库连接池
'pool.py' 在这个文件中配置池也也可以在视图函数中配置 # 1 安装 pip install dbutils # 2 使用:实例化得到一个池对象---》池是单例 from dbutils.pooled_db import PooledDB import pymysql POOL=PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='1234', database='cnblogs', charset='utf8' ) 'app.py' 视图函数 from flask import Flask,jsonify app = Flask(__name__) app.config.from_pyfile('./settings.py') import time from pool import POOL from pymysql.cursors import DictCursor ## 3 在视图函数中导入使用 @app.route('/article') def article(): conn = POOL.connection() cursor = conn.cursor(DictCursor) # 获取10条文章 cursor.execute('select id,title,author from article limit 10') time.sleep(1) # 切换 res = cursor.fetchall() print(res) return jsonify({'code': 100, 'msg': '成功', 'result': res}) if __name__ == '__main__': app.run()
2.操作数据库不带池版
from flask import Flask,jsonify app = Flask(__name__) app.config.from_pyfile('./settings.py') import time ## 3 在视图函数中导入使用 @app.route('/article') def article(): import pymysql from pymysql.cursors import DictCursor conn = pymysql.connect(user='root', password="1234", host='127.0.0.1', database='cnblogs', port=3306) cursor = conn.cursor(DictCursor) # 获取10条文章 cursor.execute('select id,title,author from article limit 10') time.sleep(1) # 切换 res = cursor.fetchall() print(res) return jsonify({'code': 100, 'msg': '成功', 'result': res}) if __name__ == '__main__': app.run(port=5001)
3.池版和非池版压测
压测代码 jmeter工具---》java import requests from threading import Thread # 没有连接池 def task(): # res = requests.get('http://127.0.0.1:5000/article') # 带连接池版 res = requests.get('http://127.0.0.1:5001/article') # 不带连接池版 print(res.json()) if __name__ == '__main__': l = [] for i in range(100): t = Thread(target=task) t.start() l.append(t) for i in l: i.join() '''效果是:使用池的连接数明显小,不使用池连接数明显很大'''
查看数据库连接数:show status like '%Threads%';
三、wtforms
wtforms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证、渲染错误信息、渲染页面
app.py
from flask import Flask,render_template,request,redirect from wtforms import Form from wtforms.fields import simple from wtforms import validators from wtforms import widgets app=Flask(__name__) app.debug=True class LoginForm(Form): # 字段(内部包含正则表达式) name = simple.StringField( label='用户名', validators=[ validators.DataRequired(message='用户名不能为空.'), validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d') ], widget=widgets.TextInput(), # 页面上显示的插件 render_kw={'class': 'form-control'} ) # 字段(内部包含正则表达式) pwd = simple.PasswordField( label='密码', validators=[ validators.DataRequired(message='密码不能为空.'), validators.Length(min=8, message='用户名长度必须大于%(min)d'), validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}", message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符') ], widget=widgets.PasswordInput(), render_kw={'class': 'form-control'} ) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'GET': form = LoginForm() return render_template('login.html', form=form) else: form = LoginForm(formdata=request.form) if form.validate(): print('用户提交数据通过格式验证,提交的值为:', form.data) else: print(form.errors) return render_template('login.html', form=form) if __name__ == '__main__': app.run()
login.html
Title
登录
{{form.name.label}} {{form.name}} {{form.name.errors[0] }}
{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}
四、Flask定制命令
1.使用 flask-script定制命令(老版本,新版本不用了)
flask 老版本中,没有命令运行项目,自定制命令 flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令 新版的flask--》官方支持定制命令 click 定制命令,这个模块就弃用了 flask-migrate 老版本基于flask-script,新版本基于flask-click写的 使用步骤 -1 pip3 install Flask-Script==2.0.3 -2 pip3 install flask==1.1.4 -3 pip3 install markupsafe=1.1.1 -4 使用 from flask_script import Manager manager = Manager(app) if __name__ == '__main__': manager.run() -5 自定制命令 @manager.command def custom(arg): """自定义命令 python manage.py custom 123 """ print(arg) - 6 执行自定制命令 python manage.py custom 123
2.新版本定制命令
from flask import Flask import click app = Flask(__name__) 自定制命令,通过create-user传入用户名就可以创建一个用户来 @app.cli.command('create-user') @click.argument('name') def create_user(name): # from pool import POOL # conn = POOL.connection() # cursor=conn.cursor() # cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'hello123']) # conn.commit() print(name) 命令行中执行 -flask --app .\Flask定制命令.py:app create-user jack -简写成 前提条件式app所在的py文件名叫app.py -flask create-user jack @app.route('/') def index(): return 'index' -运行项目的命令:flask --app .\Flask定制命令.py:app run if __name__ == '__main__': app.run()
3.Django中自定制命令
1 app下新建文件夹 management/commands/ 2 在该文件夹下新建py文件,随便命名(命令名) 3 在py文件中写代码 from django.core.management.base import BaseCommand class Command(BaseCommand): help = '命令提示' def handle(self, *args, **kwargs): 命令逻辑 4 使用命令 python manage.py py文件(命令名)
五、Flask-Cache
具体使用可以自寻去官方查看:https://flask-caching.readthedocs.io/en/latest/
from flask import Flask,render_template # 安装 pip install Flask-Caching from flask_caching import Cache config = { "DEBUG": True, "CACHE_TYPE": "SimpleCache", "CACHE_DEFAULT_TIMEOUT": 300 } app = Flask(__name__) app.config.from_mapping(config) cache = Cache(app) @app.route("/") @cache.cached(timeout=50) def index(): return render_template('index.html') @app.route('/set_cache') def set_cache(): cache.set('name','xxxx') return 'set_cache' @app.route('/get_cache') def get_cache(): res = cache.get('name') return res if __name__ == '__main__': app.run()
The End