Flask入门三(Flask-session的使用、数据库链接池、wtforms、Flask定制命令、Flask-Cache)

小明 2025-05-02 06:11:45 5

文章目录

  • 一、Flask-session使用
    • 1.使用方式一
    • 2.使用方式二
    • 3.读RedisSessionInterface源码
    • 4.flask-session补充
    • 二、数据库连接池
      • 1.flask中使用mysql
      • 2.上述问题解决
      • 使用数据库连接池
        • 1.第三方数据库连接池
        • 2.操作数据库不带池版
        • 3.池版和非池版压测
        • 三、wtforms
        • 四、Flask定制命令
          • 1.使用 flask-script定制命令(老版本,新版本不用了)
          • 2.新版本定制命令
          • 3.Django中自定制命令
          • 五、Flask-Cache

            一、Flask-session使用

            Flask内���的Session会把数据加密后保存到浏览器 我们自己重写Session类保存到Reids中只需要重写open_session和save_session方法

            而在这中间有一个模块就做了这件事,那就是flask-session,以把数据存放到文件、redis、mongodb、关系型数据库等中

            安装flask-session

            	pip install flask-session
            

            1.使用方式一

            	from flask import Flask,session
            	app = Flask(__name__)
            	app.debug=True
            	app.secret_key='jlkdoasfiuz'
            	# 只要使用session就需要secret_key
            	1.安装flask-session  pip install flask-session
            	'使用方式一'
            	2.导入(这里我写入到redis缓存数据库中)
            	from flask_session import RedisSessionInterface
            	3.把app.session_interface替换成RedisSessionInterface的对象
            	# 替换了就会走RedisSessionInterface的open_session和save_session
            	from redis import Redis
            	conn=Redis(host='127.0.0.1',port=6379,db=2)
            	# 需要传入的参数 redis, key_prefix, use_signer, permanent, sid_length
            	'''
            	1.redis 是传入链接的redis库,链接对象
            	2.key_prefix    是保存在redis中名称的前缀
            	3.use_signer    是如果是False就无需配置secret_key,默认设置True
            	4.permanent 是关闭浏览器cookie是否失效
            	5.sid_length    是生成session_key的长度,会以cookie形式写入到浏览器cookie中,但是去掉redis中session这个前缀
            	去掉前缀就是session_key的长度限制
            	'''
            	app.session_interface=RedisSessionInterface(
            	    redis=conn,key_prefix='session',use_signer=False,
            	    permanent=True,sid_length=32
            	)
            	
            	
            	@app.route('/set_session')
            	def set_session():
            	    session['name'] = 'jack'
            	    return 'set_session'
            	
            	
            	@app.route('/get_session')
            	def get_session():
            	    print(session.get('name'))
            	    return 'get_session'
            	
            	
            	if __name__ == '__main__':
            	    app.run()
            

            2.使用方式二

            	from flask import Flask,session
            	app = Flask(__name__)
            	app.debug=True
            	app.secret_key='jlkdoasfiuz'
            	# 只要使用session就需要secret_key
            	1.安装flask-session  pip install flask-session
            	
            	'使用方式二'
            	2.在flask配置文件中加入配置
            	from redis import Redis  # 导入redis
            	app.config['SESSION_TYPE'] = 'redis'  # 配置链接的类型
            	app.config['SESSION_REDIS']=Redis(host='127.0.0.1',port=6379,db=2)
            	# app.config['SESSION_KEY_PREFIX'] = 'session'  # 如果不写,默认以SESSION_COOKIE_NAME作为key
            	# app.config.from_pyfile('./settings')  # 第二种导入配置文件方式
            	3.导入Session
            	from flask_session import Session
            	Session(app)  # 核心和方式一一模一样,具体看源码
            	
            	
            	@app.route('/set_session')
            	def set_session():
            	    session['name'] = 'jack'
            	    return 'set_session'
            	
            	
            	@app.route('/get_session')
            	def get_session():
            	    print(session.get('name'))
            	    return 'get_session'
            	
            	
            	if __name__ == '__main__':
            	    app.run()
            

            3.读RedisSessionInterface源码

            	1.RedisSessionInterface的open_session(在它的父类中)
                def open_session(self, app, request):
                	# -取到前端传入,在cookie中得随机字符串
                    sid = request.cookies.get(app.config["SESSION_COOKIE_NAME"])
                    if not sid:
                        sid = self._generate_sid(self.sid_length)
                        # 当sid不为空,把sid传入到session_class得到对象
                        return self.session_class(sid=sid, permanent=self.permanent)
                    if self.use_signer:  # 用来加密,所以第一种方式的ues_signer最好不要改为False
                        try:
                            sid = self._unsign(app, sid)
                        except BadSignature:
                            sid = self._generate_sid(self.sid_length)
                            return self.session_class(sid=sid, permanent=self.permanent)
                    return self.fetch_session(sid)
            	
                def fetch_session(self, sid):
                    # 取到随机字符串
                    prefixed_session_id = self.key_prefix + sid
                    # 从redis中取出key为前缀+随机字符串对应的value值
                    value = self.redis.get(prefixed_session_id)
                    if value is not None:
                        try:
                        	# 解密成字符串
                            session_data = self.serializer.loads(value)
                            # 把解密后的数据,组装到 session对象中
                            return self.session_class(session_data, sid=sid)
                        except pickle.UnpicklingError:
                            return self.session_class(sid=sid, permanent=self.permanent)
                    return self.session_class(sid=sid, permanent=self.permanent)
            	
            	2.RedisSessionInterface的save_session(在它自己内)
               	def save_session(self, app, session, response):
                   if not self.should_set_cookie(app, session):
                       return
                   domain = self.get_cookie_domain(app)
                   path = self.get_cookie_path(app)
                   
                   if not session: # 如果session有值
                       if session.modified:  # 如果值被修改过,就把cookie中的删除,并且删除redis中的
                           self.redis.delete(self.key_prefix + session.sid)
                           response.delete_cookie(
                               app.config["SESSION_COOKIE_NAME"], domain=domain, path=path
                           )
                       return
                   # 
                   expiration_datetime = self.get_expiration_time(app, session)
                   serialized_session_data = self.serializer.dumps(dict(session))
            		# 放到redis中
                   self.redis.set(
                       name=self.key_prefix + session.sid,
                       value=serialized_session_data,
                       ex=total_seconds(app.permanent_session_lifetime),  # 过期时间
                   )
                   # 把session对应的随机字符串放到cookie中
                   self.set_cookie_to_response(app, session, response, expiration_datetime)
            

            4.flask-session补充

            	- session的前缀如果不传,默认:config.setdefault('SESSION_KEY_PREFIX', 'session:')
                - session过期时间:通过配置,如果不写,会有默认
                	'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#这个配置文件控制
                -设置cookie时,如何设定关闭浏览器则cookie失效
                	permanent=False
                    app.config['SESSION_PERMANENT'] = False
            

            二、数据库连接池

            1.flask中使用mysql

            	'settings.py'
            	SECRET_KEY = 'fdsjakluiz'
            	DEBUG = True
            	MYSQL_USER = 'root'
            	MYSQL_HOST = '127.0.0.1'
            	MYSQL_PORT = 3306
            	MYSQL_PASSWORD = '1234'
            	MYSQL_DATABASE = 'cnblogs'
            	JSON_AS_ASCII = False
            	'app.py'
            	import pymysql.cursors
            	from flask import Flask, jsonify
            	app = Flask(__name__)
            	app.config.from_pyfile('./settings.py')
            	
            	# pymysql操作mysql
            	from pymysql import Connect
            	conn = Connect(
            	    user=app.config.get('MYSQL_USER'),
            	    password=app.config.get('MYSQL_PASSWORD'),
            	    host=app.config.get('MYSQL_HOST'),
            	    database=app.config.get('MYSQL_DATABASE'),
            	    port=app.config.get('MYSQL_PORT'),
            	)
            	# pymysql.cursors.DictCursor查出来的是列表套字典的形式
            	cursor = conn.cursor(pymysql.cursors.DictCursor)
            	# cursor = conn.cursor()
            	# app.config['JSON_AS_ASCII'] = False  # 前端显示json格式中文
            	@app.route('/')
            	def articles():
            	    cursor.execute('select id,title,author from article limit 10')
            	    article_list = cursor.fetchall()  # 拿出所有
            	    return jsonify(article_list)
            	
            	
            	if __name__ == '__main__':
            	    app.run()
            

            这种方式conn和cursor如果是全局,出现如下问题:

            	'上面的  conn和cursor 都是全局的'
            	假设极端情况:同时并发两个用户
            		-一个用户查询所有文章
            	    -一个用户查询所有用户
            	    
            	'在线程中全局变量是共享的'
            	就会出现,第一个线程拿着cursor执行了:
            	cursor.execute('select id,title,author from article limit 10')
            	然后第二个线程拿着 cursor 执行了:
            	cursor.execute('select id,name from user limit 10')
            		    
            	第一个线程开始执行:(用的全是同一个cursor)
            	article_list = cursor.fetchall()
            	就会出现查询article的cursor取出来的数据是  用户相关数据---》出现数据错乱
            

            2.上述问题解决

            每个人:线程用自己的从conn和cursor,在视图函数中,拿到链接和cursor

            	import pymysql.cursors
            	from flask import Flask, jsonify
            	app = Flask(__name__)
            	app.config.from_pyfile('./settings.py')
            	
            	# pymysql操作mysql
            	from pymysql import Connect
            	
            	@app.route('/')
            	def articles():
            	    conn = Connect(
            	        user=app.config.get('MYSQL_USER'),
            	        password=app.config.get('MYSQL_PASSWORD'),
            	        host=app.config.get('MYSQL_HOST'),
            	        database=app.config.get('MYSQL_DATABASE'),
            	        port=app.config.get('MYSQL_PORT'),
            	    )
            	    cursor = conn.cursor(pymysql.cursors.DictCursor)
            	    cursor.execute('select id,title,author from article limit 10')
            	    article_list = cursor.fetchall()  # 拿出所有
            	    return jsonify(article_list)
            	
            	
            	@app.route('/desc')
            	def desc():
            	    conn = Connect(
            	        user=app.config.get('MYSQL_USER'),
            	        password=app.config.get('MYSQL_PASSWORD'),
            	        host=app.config.get('MYSQL_HOST'),
            	        database=app.config.get('MYSQL_DATABASE'),
            	        port=app.config.get('MYSQL_PORT'),
            	    )
            	    cursor = conn.cursor(pymysql.cursors.DictCursor)
            	    cursor.execute('select id,real_desc from article limit 10')
            	    article_list = cursor.fetchall()  # 拿出所有
            	    return jsonify(article_list)
            	
            	
            	if __name__ == '__main__':
            	    app.run()
            

            但是这种如果并发量过高,就会出现连接数过多的问题,mysql的性能就降低了

            使用数据库连接池

            	'上述操作存在的问题'
            	1.原生pymysql操作,最好有一个rom---->sqlalchemy
            	2.并发问题:conn和cursor要做成单例,还是每个视图函数一个?
            		-如果使用单例,数据会错乱
            		-咱们需要,每个视图函数,哪一个链接,如果并发数过多,mysql链接数就会很多,所以使用连接池解决
            	
            	# django orm操作,一个请求,就会拿到一个mysql的链接,用完后就释放
            	'所以想要彻底解决,得使用数据库连接池'
            	-限定 mysql链接最多,无论多少线程操作,都是从池中取链接使用
            	'解决上面的两个问题'
            	-数据库连接池
            	-创建一个全局的池
            	-每次进入视图函数,从池中取一个连接使用,使用完放回到池中,只要控制池的大小,就能控制mysql连接数
            

            1.第三方数据库连接池

            	'pool.py' 在这个文件中配置池也也可以在视图函数中配置
            	# 1 安装 pip install dbutils
            	# 2 使用:实例化得到一个池对象---》池是单例
            	from dbutils.pooled_db import PooledDB
            	import pymysql
            	
            	POOL=PooledDB(
            	    creator=pymysql,  # 使用链接数据库的模块
            	    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            	    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            	    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            	    maxshared=3,
            	    # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
            	    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            	    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            	    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            	    ping=0,
            	    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            	    host='127.0.0.1',
            	    port=3306,
            	    user='root',
            	    password='1234',
            	    database='cnblogs',
            	    charset='utf8'
            	)
            	
            	'app.py'  视图函数
            	from flask import Flask,jsonify
            	app = Flask(__name__)
            	app.config.from_pyfile('./settings.py')
            	import time
            	from pool import POOL
            	from pymysql.cursors import DictCursor
            	
            	## 3 在视图函数中导入使用
            	@app.route('/article')
            	def article():
            	    conn = POOL.connection()
            	    cursor = conn.cursor(DictCursor)
            	    # 获取10条文章
            	    cursor.execute('select id,title,author from article limit 10')
            	    time.sleep(1)
            	    # 切换
            	    res = cursor.fetchall()
            	    print(res)
            	    return jsonify({'code': 100, 'msg': '成功', 'result': res})
            	
            	
            	if __name__ == '__main__':
            	    app.run()
            

            2.操作数据库不带池版

            	from flask import Flask,jsonify
            	app = Flask(__name__)
            	app.config.from_pyfile('./settings.py')
            	import time
            	## 3 在视图函数中导入使用
            	@app.route('/article')
            	def article():
            	    import pymysql
            	    from pymysql.cursors import DictCursor
            	    conn = pymysql.connect(user='root',
            	                           password="1234",
            	                           host='127.0.0.1',
            	                           database='cnblogs',
            	                           port=3306)
            	    cursor = conn.cursor(DictCursor)
            	    # 获取10条文章
            	    cursor.execute('select id,title,author from article limit 10')
            	    time.sleep(1)
            	    # 切换
            	    res = cursor.fetchall()
            	    print(res)
            	    return jsonify({'code': 100, 'msg': '成功', 'result': res})
            	
            	
            	if __name__ == '__main__':
            	    app.run(port=5001)
            

            3.池版和非池版压测

            	压测代码  jmeter工具---》java
            	import requests
            	from threading import Thread
            	
            	
            	# 没有连接池
            	def task():
            	    # res = requests.get('http://127.0.0.1:5000/article')  # 带连接池版
            	    res = requests.get('http://127.0.0.1:5001/article')  # 不带连接池版
            	    print(res.json())
            	
            	
            	if __name__ == '__main__':
            	    l = []
            	    for i in range(100):
            	        t = Thread(target=task)
            	        t.start()
            	        l.append(t)
            	
            	    for i in l:
            	        i.join()
            	
            	'''效果是:使用池的连接数明显小,不使用池连接数明显很大'''
            

            查看数据库连接数:show status like '%Threads%';

            三、wtforms

            wtforms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证、渲染错误信息、渲染页面

            app.py

            from flask import Flask,render_template,request,redirect
            from wtforms import Form
            from wtforms.fields import simple
            from wtforms import validators
            from wtforms import widgets
            app=Flask(__name__)
            app.debug=True
            class LoginForm(Form):
                # 字段(内部包含正则表达式)
                name = simple.StringField(
                    label='用户名',
                    validators=[
                        validators.DataRequired(message='用户名不能为空.'),
                        validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
                    ],
                    widget=widgets.TextInput(), # 页面上显示的插件
                    render_kw={'class': 'form-control'}
                )
                # 字段(内部包含正则表达式)
                pwd = simple.PasswordField(
                    label='密码',
                    validators=[
                        validators.DataRequired(message='密码不能为空.'),
                        validators.Length(min=8, message='用户名长度必须大于%(min)d'),
                        validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
                                          message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
                    ],
                    widget=widgets.PasswordInput(),
                    render_kw={'class': 'form-control'}
                )
            @app.route('/login', methods=['GET', 'POST'])
            def login():
                if request.method == 'GET':
                    form = LoginForm()
                    return render_template('login.html', form=form)
                else:
                    form = LoginForm(formdata=request.form)
                    if form.validate():
                        print('用户提交数据通过格式验证,提交的值为:', form.data)
                    else:
                        print(form.errors)
                    return render_template('login.html', form=form)
            if __name__ == '__main__':
                app.run()
            

            login.html

            	
            	
            	
            	    
            	    Title
            	
            	
            	

            登录

            {{form.name.label}} {{form.name}} {{form.name.errors[0] }}

            {{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}

            四、Flask定制命令

            1.使用 flask-script定制命令(老版本,新版本不用了)

            	flask 老版本中,没有命令运行项目,自定制命令
            	flask-script 解决了这个问题:flask项目可以通过命令运行,可以定制命令
            	
            	新版的flask--》官方支持定制命令  click 定制命令,这个模块就弃用了
            	flask-migrate 老版本基于flask-script,新版本基于flask-click写的
            	
            	使用步骤
            		-1 pip3 install  Flask-Script==2.0.3
            	    -2 pip3 install flask==1.1.4
            	    -3 pip3 install markupsafe=1.1.1
            		-4 使用
            	    from flask_script import Manager
            	    manager = Manager(app)
            	    if __name__ == '__main__':
            	    	manager.run()
            	    -5 自定制命令
            	    @manager.command
            	    def custom(arg):
            	        """自定义命令
            	        python manage.py custom 123
            	        """
            	        print(arg)
            	        
            	    - 6 执行自定制命令
            	    python manage.py custom 123
            

            2.新版本定制命令

            	from flask import Flask
            	import click
            	app = Flask(__name__)
            	
            	自定制命令,通过create-user传入用户名就可以创建一个用户来
            	@app.cli.command('create-user')
            	@click.argument('name')
            	def create_user(name):
            	    # from pool import POOL
            	    # conn = POOL.connection()
            	    # cursor=conn.cursor()
            	    # cursor.excute('insert into user (username,password) values (%s,%s)',args=[name,'hello123'])
            	    # conn.commit()
            	    print(name)
            	
            	
            	命令行中执行
            	-flask --app .\Flask定制命令.py:app create-user jack
            	-简写成 前提条件式app所在的py文件名叫app.py
            	-flask create-user jack
            	
            	
            	@app.route('/')
            	def index():
            	    return 'index'
            	
            	
            	-运行项目的命令:flask --app .\Flask定制命令.py:app run
            	
            	if __name__ == '__main__':
            	    app.run()
            

            3.Django中自定制命令

            	1 app下新建文件夹
            		management/commands/
            	2 在该文件夹下新建py文件,随便命名(命令名)
            	
            	3 在py文件中写代码
            		from django.core.management.base import BaseCommand
            		class Command(BaseCommand):
            		    help = '命令提示'
            		    def handle(self, *args, **kwargs):
            				命令逻辑  
            	4 使用命令
            		python manage.py  py文件(命令名)
            

            五、Flask-Cache

            具体使用可以自寻去官方查看:https://flask-caching.readthedocs.io/en/latest/

            	from flask import Flask,render_template
            	# 安装 pip install Flask-Caching
            	from flask_caching import Cache
            	
            	config = {
            	    "DEBUG": True,
            	    "CACHE_TYPE": "SimpleCache",
            	    "CACHE_DEFAULT_TIMEOUT": 300
            	}
            	app = Flask(__name__)
            	app.config.from_mapping(config)
            	cache = Cache(app)
            	
            	
            	@app.route("/")
            	@cache.cached(timeout=50)
            	def index():
            	    return render_template('index.html')
            	
            	
            	@app.route('/set_cache')
            	def set_cache():
            	    cache.set('name','xxxx')
            	    return 'set_cache'
            	
            	
            	@app.route('/get_cache')
            	def get_cache():
            	    res = cache.get('name')
            	    return res
            	
            	
            	if __name__ == '__main__':
            	    app.run()
            
The End
微信