JWT原理及实现
wt(JSON Web Tokens),是一种开发的行业标准 RFC 7519 ,用于安全的表示双方之间的声明。目前,jwt广泛应用在系统的用户认证方面,特别是现在前后端分离项目。
1. jwt认证流程
在项目开发中,一般会按照上图所示的过程进行认证,即:用户登录成功之后,服务端给用户浏览器返回一个token,以后用户浏览器要携带token再去向服务端发送请求,服务端校验token的合法性,合法则给用户看数据,否则,返回一些错误信息。
传统token方式和jwt在认证方面有什么差异?
-
传统token方式
用户登录成功后,服务端生成一个随机token给用户,并且在服务端(数据库或缓存)中保存一份token,以后用户再来访问时需携带token,服务端接收到token之后,去数据库或缓存中进行校验token的是否超时、是否合法。
-
jwt方式
用户登录成功后,服务端通过jwt生成一个随机token给用户(服务端无需保留token),以后用户再来访问时需携带token,服务端接收到token之后,通过jwt对token进行校验是否超时、是否合法。
2. jwt创建token
2.1 原理
jwt的生成token格式如下,即:由 .
连接的三段字符串组成。
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
生成规则如下:
-
第一段HEADER部分,固定包含算法和token类型,对此json进行base64url加密,这就是token的第一段。
{ "alg": "HS256", "typ": "JWT" }
-
第二段PAYLOAD部分,包含一些数据,对此json进行base64url加密,这就是token的第二段。
{ "sub": "1234567890", "name": "John Doe", "iat": 1516239022 ... }
-
第三段SIGNATURE部分,把前两段的base密文通过
.
拼接起来,然后对其进行HS256
加密,再然后对hs256
密文进行base64url加密,最终得到token的第三段。
base64url( HMACSHA256( base64UrlEncode(header) + "." + base64UrlEncode(payload), your-256-bit-secret (秘钥加盐) ) )
最后将三段字符串通过 .
拼接起来就生成了jwt的token。
注意:base64url加密是先做base64加密,然后再将 -
替代 +
及 _
替代 /
2.2 代码实现
基于Python的pyjwt模块创建jwt的token。
-
安装
pip install pyjwt
-
实现
import jwt import datetime from jwt import exceptions SALT = \'iv%x6xo7l7_u9bf_u!9#g#m*)*=ej@bek5)(@u3kh*72+unjv=\' def create_token(): # 构造header headers = { \'typ\': \'jwt\', \'alg\': \'HS256\' } # 构造payload payload = { \'user_id\': 1, # 自定义用户ID \'username\': \'wupeiqi\', # 自定义用户名 \'exp\': datetime.datetime.utcnow() + datetime.timedelta(minutes=5) # 超时时间 } result = jwt.encode(payload=payload, key=SALT, algorithm="HS256", headers=headers).decode(\'utf-8\') return result if __name__ == \'__main__\': token = create_token() print(token)
3. jwt校验token
一般在认证成功后,把jwt生成的token返回给用户,以后用户再次访问时候需要携带token,此时jwt需要对token进行超时
及合法性
校验。
获取token之后,会按照以下步骤进行校验:
-
将token分割成
header_segment
、payload_segment
、crypto_segment
三部分
jwt_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c" signing_input, crypto_segment = jwt_token.rsplit(b\'.\', 1) header_segment, payload_segment = signing_input.split(b\'.\', 1)
-
对第一部分
header_segment
进行base64url解密,得到header
-
对第二部分
payload_segment
进行base64url解密,得到payload
-
对第三部分
crypto_segment
进行base64url解密,得到signature
-
对第三部分
signature
部分数据进行合法性校验- 拼接前两段密文,即:
signing_input
- 从第一段明文中获取加密算法,默认:
HS256
- 使用 算法+盐 对
signing_input
进行加密,将得到的结果和signature
密文进行比较。
- 拼接前两段密文,即:
import jwt import datetime from jwt import exceptions def get_payload(token): """ 根据token获取payload :param token: :return: """ try: # 从token中获取payload【不校验合法性】 # unverified_payload = jwt.decode(token, None, False) # print(unverified_payload) # 从token中获取payload【校验合法性】 verified_payload = jwt.decode(token, SALT, True) return verified_payload except exceptions.ExpiredSignatureError: print(\'token已失效\') except jwt.DecodeError: print(\'token认证失败\') except jwt.InvalidTokenError: print(\'非法的token\')
if __name__ == \'__main__\': token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE1NzM1NTU1NzksInVzZXJuYW1lIjoid3VwZWlxaSIsInVzZXJfaWQiOjF9.xj-7qSts6Yg5Ui55-aUOHJS4KSaeLq5weXMui2IIEJU" payload = get_payload(token)
4. JWT实战
中间件jwt.go
// JWTAuth 中间件,检查token func JWTAuth() gin.HandlerFunc { return func(c *gin.Context) { token := c.Request.Header.Get("Authorization") if token == "" { c.JSON(250, gin.H{ "status": -1, "msg": "请求未携带token,无权限访问", }) c.Abort() return } j := NewJWT() // parseToken 解析token包含的信息 claims, err := j.ParseToken(token) if err != nil { if err == TokenExpired { c.JSON(251, gin.H{ "status": -1, "msg": "授权已过期", }) c.Abort() return } c.JSON(252, gin.H{ "status": -1, "msg": err.Error(), }) c.Abort() return } // 继续交由下一个路由处理,并将解析出的信息传递下去 c.Set("claims", claims) } } // JWT 签名结构 type JWT struct { SigningKey []byte } // 一些常量 var ( TokenExpired error = errors.New("Token is expired") TokenNotValidYet error = errors.New("Token not active yet") TokenMalformed error = errors.New("That\'s not even a token") TokenInvalid error = errors.New("Couldn\'t handle this token:") SignKey string = "Zhangyafei" TokenExpireAt int64 = 60 * 60 * 24 // 默认过期时间1天 Issuer string = "zhangyafei" ) // 新建一个jwt实例 func NewJWT() *JWT { return &JWT{ []byte(GetSignKey()), } } // 获取signKey func GetSignKey() string { return SignKey } // 这是SignKey func SetSignKey(key string) string { SignKey = key return SignKey } // CreateToken 生成一个token func (j *JWT) CreateToken(claims request.CustomClaims) (string, error) { token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) return token.SignedString(j.SigningKey) } // 解析Tokne func (j *JWT) ParseToken(tokenString string) (*request.CustomClaims, error) { token, err := jwt.ParseWithClaims(tokenString, &request.CustomClaims{}, func(token *jwt.Token) (interface{}, error) { return j.SigningKey, nil }) if err != nil { if ve, ok := err.(*jwt.ValidationError); ok { if ve.Errors&jwt.ValidationErrorMalformed != 0 { return nil, TokenMalformed } else if ve.Errors&jwt.ValidationErrorExpired != 0 { // Token is expired return nil, TokenExpired } else if ve.Errors&jwt.ValidationErrorNotValidYet != 0 { return nil, TokenNotValidYet } else { return nil, TokenInvalid } } } if claims, ok := token.Claims.(*request.CustomClaims); ok && token.Valid { return claims, nil } return nil, TokenInvalid } // 更新token func (j *JWT) RefreshToken(tokenString string) (string, error) { jwt.TimeFunc = func() time.Time { return time.Unix(0, 0) } token, err := jwt.ParseWithClaims(tokenString, &request.CustomClaims{}, func(token *jwt.Token) (interface{}, error) { return j.SigningKey, nil }) if err != nil { return "", err } if claims, ok := token.Claims.(*request.CustomClaims); ok && token.Valid { jwt.TimeFunc = time.Now claims.StandardClaims.ExpiresAt = time.Now().Add(1 * time.Hour).Unix() // 默认token有效期为1个小时 return j.CreateToken(*claims) } return "", TokenInvalid } // 生成令牌 func GenerateToken(c *gin.Context, user *models.SysUser) (token string, msg string, ok bool) { var grade_list []string var class_id_list []int if user.RoleId == 1 { dao.DB.Model(&models.SysGrade{}).Pluck("name", &grade_list) dao.DB.Model(&models.SysClass{}).Pluck("id", &class_id_list) }else { grade_list = strings.Split(user.Grades, ",") fmt.Println(grade_list) dao.DB.Model(&models.SysClass{}).Joins("left join sys_grades on sys_grades.id = sys_classes.grade_id").Where("major_id = ? and sys_grades.name in (?)", user.MajorID, grade_list).Pluck("sys_classes.id", &class_id_list) } fmt.Println(grade_list) j := &JWT{[]byte(SignKey)} claims := request.CustomClaims{ user.ID, user.Username, user.AvatarUrl, user.RoleId, user.MajorID, grade_list, class_id_list, jwt.StandardClaims{ NotBefore: int64(time.Now().Unix() - 1000), // 签名生效时间 ExpiresAt: int64(time.Now().Unix() + TokenExpireAt), // 过期时间 一小时 Issuer: Issuer, //签名的发行者 }, } token, err := j.CreateToken(claims) if err != nil { log.Println(err) return token, "创建token失败", false } else { return token, "登录成功!", true } }
建立路由
func SetupRouters() (Router *gin.Engine) { Router = gin.Default() Router.Static("/static", "static") Router.LoadHTMLGlob("templates/*") Router.GET("/", func(c *gin.Context) { c.HTML(http.StatusOK, "index.html", nil) }) Router.Use(middleware.Cors()) url := ginSwagger.URL("http://localhost:8000/swagger/doc.json") // The url pointing to API definition Router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler, url)) ApiV1Group := Router.Group("api/v1") ApiV1Group .POST("/login", v1.Login) UserGroup := ApiV1Group.Group("users").Use(middleware.JWTAuth()) { UserGroup.POST("/uploadAvatarImg", v1.UploadAvatarImg) UserGroup.PUT("/resetPassword", v1.ResetPwd) UserGroup.GET("/getUserList", v1.GetUserList) UserGroup.PUT("/changeState", v1.ChangeState) UserGroup.PUT("/updateRole", v1.UpdateUserRole) UserGroup.POST("/addOneUser", v1.AddOneUser) UserGroup.PUT("/updateUser", v1.UpdateUser) UserGroup.DELETE("/deleteUser", v1.DeleteUser) } return }
main.go
func main() { // 1. 初始化配置文件 err := common.GetConfigIni("config/config.ini") if err != nil { panic(err) } // 2. 初始化数据库 err = dao.InitDB() if err != nil { panic(err) } defer dao.Close() // 3. 数据表迁移 initialize.DBTableMigrate() // 4. 建立路由 router := initialize.SetupRouters() _ = router.Run(":8000") }
Django和Flask使用jwt案例
github地址:https://github.com/zhangyafeii/jwt