flask 指引

参考:Flask Web Development, Flask Web开发, Flask 框架



安装

$ pip install flask

>>> import flask

[TOP]


启动服务

$ cat run.py
from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return '<h1>Hello Flask</h1>'

if __name__ == '__main__':
    app.run()

$ python run.py
* Running on http://127.0.0.1:5000/

在本机浏览器上打开 http://127.0.0.1:5000/ 即可看到运行结果。要从网络上访问服务,需要设置host

$ cat run.py
from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return '<h1>Hello Flask</h1>'

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

$ python run.py
* Running on http://0.0.0.0:5000/

在生产环境下,可采用”nginx反向代理+gunicorn”的方案部署服务器:

$ pip install gunicorn
$ sudo apt-get install nginx
$ cat run.py
from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return '<h1>Hello Flask</h1>'

if __name__ == '__main__':
    app.run()
$ cat /etc/nginx/conf.d/default.conf
server {
    listen       80 default_server;
    server_name  0.0.0.0;
    location / {
        proxy_pass  http://127.0.0.1:8080;
    }
}

$ sudo gunicorn -b 127.0.0.1:8080 run:app
[2017-03-18 23:17:46 +0000] [3791] [INFO] Starting gunicorn 19.6.0
[2017-03-18 23:17:46 +0000] [3791] [INFO] Listening at: http://127.0.0.1:8080 (3791)
[2017-03-18 23:17:46 +0000] [3791] [INFO] Using worker: sync
[2017-03-18 23:17:46 +0000] [3796] [INFO] Booting worker with pid: 3796

文件目录:

- run.py

[TOP]


模板

简单模板

$ pip install Jinja2
$ cat run.py
from flask import Flask, render_template

app = Flask(__name__)

@app.route('/')
def index():
    return render_template('index.html')

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

$ cat templates/index.html
<html>
    <head><title>Flask</title></head>
    <body><h1>Hello World</h1></body>
</html>

$ python run.py
* Running on http://0.0.0.0:5000/

文件目录:

- run.py
|- templates/index.html

模板中的变量

$ cat run.py
from flask import Flask, render_template

app = Flask(__name__)

@app.route('/')
def index():
    return render_template('index.html')
def upage(user):
    return render_template('upage.html', name=user)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

$ cat templates/upage.html
<html>
  <head><title>Flask</title></head>
  <body><h1>Hello, { { name } }</h1></body>
</html>

$ python run.py
* Running on http://0.0.0.0:5000/

浏览器把输入的user的值传递给脚本中的namename对应模板中的 { { name } }

文件目录:

- run.py
- templates/
| - index.html
| - upage.html

模板中的控制结构

条件判断:

$ cat templates/upage.html
<html>
  <head><title>Flask</title></head>
  <body>
    { % if name % }
      <h1>Hello, { { name } }</h1>
    { % else % }
      <h1>Hello, Flask</h1>
    { % endif % }
  </body>
</html>

循环:

$ cat templates/upage.html
<html>
  <head><title>Flask</title></head>
  <body>
    { % for user in name % }
      <h1>Hello, { { user } }</h1>
    { % endfor % }
  </body>
</html>

衍生模板:

$ cat templates/index.html
<html>
  <head><title>Flask</title></head>
  <body><h1>Hello World</h1></body>
</html>
{ % extends "upage.html" % }

$ cat templates/upage.html
<html>
  <head><title>Flask</title></head>
  <body><h1>Hello Flask</h1></body>
</html>

文件目录:

- run.py
- templates/
| - index.html
| - upage.html

[TOP]


静态路由

$ cat templates/index.html
<html>
  <head>
    <title>Flask</title>
    <link rel="shortcut icon" href="{ { url_for('static', filename = 'i32.ico') } }" type="image/x-icon">
  </head>
  <body>
    <h1>Hello World</h1>
  </body>
</html>

文件目录:

- run.py
- templates/
| - index.html
- static/
| - i32.ico

[TOP]


表单

$ pip install flask_wtf
$ cat run.py
from flask import Flask, render_template, request
from flask.ext.wtf import Form
from wtforms import StringField, SubmitField, PasswordField
from wtforms.validators import Required

app = Flask(__name__)
app.config['SECRET_KEY'] = '******'

class NameForm(Form):
    name = StringField('user:', valiators = [Required()])
    passwd = PasswordField('password:',validators = [Required()])
    submit = SubmitField('submit')

@app.route('/')
def index():
    return render_template('index.html')
def upage(user):
    return render_template('upage.html', name=user)

@app.route('/submit')
def submit_form():
    name = None
    form = NameForm()
    if form.validate_on_submit():
        name = form.name.data
        form.name.data = ''
    return render_template('submit.html',form=form,name=name)

@app.route('/submit', methods=['GET','POST'])
def submit():
    return request.form['name']+'</br>'+request.form['passwd']

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

$ cat templates/submit.html
<html>
  <head><title>Flask</title></head>
  <body>
    <from method="POST",action="/submit">
      { { form.hidden_tag() } }
      { { form.name.label } }{ { form.name() } }</br>
      { { form.passwd.label } }{ { form.passwd() } }</br>
      { { form.submit() } }
    </form>
  </body>
</html>

$ python run.py
* Running on http://0.0.0.0:5000/

文件目录:

- run.py
- templates/
| - index.html
| - upage.html
| - submit.html

[TOP]


文件

$ pip install flask_uploads

run.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
from flask import Flask, render_template, redirect, url_for, request
from flask_uploads import UploadSet, configure_uploads, ALL, TEXT, IMAGES, AUDIO, DOCUMENTS, DATA, SCRIPTS, ARCHIVES, EXECUTABLES, patch_request_class
from flask_wtf import FlaskForm
from flask_wtf.file import FileField, FileRequired, FileAllowed
from wtforms import SubmitField

app = Flask(__name__)

app.config['SECRET_KEY'] = '******'
app.config['UPLOADED_ALLFILE_DEST'] = './static/uploads/' # 文件上传目录

filesUpload = UploadSet('allfile', ALL) # 支持的文件类型
configure_uploads(app, filesUpload)
patch_request_class(app, 1024*1024*1024) # 支持的最大文件,默认16MB

class UploadForm(FlaskForm):
    filesToUpload = FileField(validators=[FileRequired(u'请选择文件')])
    submit = SubmitField(u'上传文件')

@app.route('/', methods=['GET', 'POST'])
def upload_file():
    files_list = os.listdir(app.config['UPLOADED_ALLFILE_DEST'])
    form = UploadForm()
    if form.validate_on_submit():
        filename = filesUpload.save(form.filesToUpload.data)
        file_url = u'http://192.168.1.99/static/uploads/' + filename
    else:
        file_url = None
        filename = None
    return render_template('upload.html', form=form, files_list=files_list, file_url=file_url, filename=filename)

@app.route('/show/<filename>')
def show_file(filename):
    file_url = u'http://192.168.1.99/static/uploads/' + filename
    return render_template('show.html', file_url=file_url, filename=filename)

@app.route('/delete/<filename>')
def delete_file(filename):
    file_path = u'/home/pi/flask/static/uploads/' + filename
    os.remove(file_path)
    return redirect(url_for('upload_file'))

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

templates/index.html

<!DOCTYPE html>
<html>
<head>
<title>{ % block title % } T&T { % endblock % }</title>
    <!-- Required meta tags -->
    <meta charset="utf-8">
    <meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no">
    <!-- Bootstrap CSS -->
    <link rel="stylesheet" href="{ { url_for('static', filename = 'css/styles.css', _external=False) } }">
    <link rel="stylesheet" href="{ { url_for('static', filename = 'css/bootstrap.css', _external=False) } }">
    <link rel="icon" href="{ { url_for('static', filename = 'images/i32.ico') } }" type="image/x-icon">
</head>
<body>
{ % block body % }
{ % endblock % }
    <!-- jQuery first, then Tether, then Bootstrap JS. -->
    <script language="JavaScript" src="{ { url_for('static', filename = 'javascripts/jquery-1.9.1.min.js') } }"></script>
    <script language="JavaScript" src="{ { url_for('static', filename = 'javascripts/bootstrap.js') } }"></script>
</body>
</html>

templates/upload.html

{ % extends "index.html" % }
{ % block body % }
    <div class="input-group">
    <form method="POST" enctype="multipart/form-data">
        { { form.hidden_tag() } }
        { { form.filesToUpload } }
        { { form.submit } }
        { % for error in form.filesToUpload.errors % }
            <span style="color: red;">{ { error } }</span>
        { % endfor % }
    </form>
    </div>
{ % if file_url % }
    <br>[new]: { { filename } }
    <a href="{ { file_url } }"> open </a>
{ % endif % }
    <hr>
    <h3> 文件列表 </h3>
{ % for photo in files_list | sort % }
    <hr>
    <font class="output">&raquo;</font> { { photo } }
    <a href="{ { url_for('show_file', filename=photo) } }"> open </a>
    <br>
    <a href="http://192.168.1.99/static/uploads/{ { photo } }"><img src="/static/uploads/{ { photo } }" width="350" height="350" alt='{ { photo } }' /></a>
    <br>
{ % endfor % }
{ % endblock % }

templates/show.html

{ % extends "index.html" % }
{ % block body % }
{ % if file_url % }
    <br><a href="{ { file_url } }"><img src={ { file_url } } alt='image' /></a>
    <br><a href="{ { url_for('delete_file', filename=filename) } }"> del </a>
{ % endif % }
{ % endblock % }

文件目录:

- run.py
- templates/
| - index.html
| - upload.html
| - show.html
- static/
| - css/
  | - styles.css
  | - bootstrap.css
| - javascripts/
  | - jquery-1.9.1.min.js
  | - bootstrap.js
| - images/
  | - i32.ico
| - uploads/

[TOP]


数据库

新建一个数据库flask

$ mysql -uUSERNAME -pPASSWD -e"create database flask;"

新建表

$ sudo apt-get install python-dev libmysqlclient-dev
$ pip install mysql-python
$ pip install flask-sqlalchemy
$ cat db.py
from flask import Flask
import MySQLdb
from flask.ext.sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://USERNAME:PASSWD@localhost/flask'
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    email = db.Column(db.String(320), unique=True)
    phone = db.Column(db.String(32), nullable=False)
    def __init__(self, username, email, phone):
        self.username = username
        self.email = email
        self.phone= phone

if __name__ == '__main__':
    db.create_all()

$ python db.py
$ mysql -uUSERNAME -pPASSWD -e"use flask;show tables;"

插入数据

$ cat insert.py
from flask import Flask
import MySQLdb
from flask.ext.sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://USERNAME:PASSWD@localhost/flask'
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    email = db.Column(db.String(320), unique=True)
    phone = db.Column(db.String(32), nullable=False)
    def __init__(self, username, email, phone):
        self.username = username
        self.email = email
        self.phone= phone

inset = User(username = 'uuspider', email = 'uuspider@gmail.com', phone = '88888888')
db.session.add(inset)
db.session.commit()

$ python insert.py
$ mysql -uUSERNAME -pPASSWD -e"use flask;select * from user;"

查询数据

$ cat select.py
from flask import Flask
import MySQLdb
from flask.ext.sqlalchemy import SQLAlchemy
from sqlalchemy import and_,not_,or_

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://USERNAME:PASSWD@localhost/flask'
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    email = db.Column(db.String(320), unique=True)
    phone = db.Column(db.String(32), nullable=False)
    def __init__(self, username, email, phone):
        self.username = username
        self.email = email
        self.phone= phone
    def __repr__(self):
        return "<User '{:s}'>".format(self.username)

select_ = User.query.filter_by(username='uuspider').first()
print(select_.id) #精确查询ID
print User.query.filter(User.email.endswith('@qq.com')).all() #模糊查询
print User.query.filter(User.username != 'yoyo').first() #非查询
print User.query.filter(not_(User.username=='yoyo')).first() #非查询
print User.query.filter(or_(User.username != 'yoyo',User.email.endswith('@example.com'))).first() #或查询
print User.query.filter(and_(User.username != 'yoyo',User.email.endswith('@example.com'))).first() #与查询
print User.query.limit(10).all() #查询返回的数据的数目
data_all=User.query.all()
print (data_all)#查询全部
for i in range(len(data_all)):
    print data_all[i].username+" "+data_all[i].email+" "+data_all[i].phone #循环拿出全部数据

$ python select.py

更新数据

$ cat update.py
from flask import Flask
import MySQLdb
from flask.ext.sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://USERNAME:PASSWD@localhost/flask'
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    email = db.Column(db.String(320), unique=True)
    phone = db.Column(db.String(32), nullable=False)
    def __init__(self, username, email, phone):
        self.username = username
        self.email = email
        self.phone= phone

news=User.query.all()
print news
news[1].username='test'
db.session.commit()

$ python update.py

删除数据

$ cat delete.py
from flask import Flask
import MySQLdb
from flask.ext.sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://USERNAME:PASSWD@localhost/flask'
db = SQLAlchemy(app)

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    email = db.Column(db.String(320), unique=True)
    phone = db.Column(db.String(32), nullable=False)
    def __init__(self, username, email, phone):
        self.username = username
        self.email = email
        self.phone= phone

name=User.query.filter_by(username = 'bb').first()
db.session.delete(name)
db.session.commit()

$ python delete.py

[TOP]