简单使用 OIDC 协议实现单点登录
OIDC(OpenID Connect)是建立在 OAuth 2.0 之上的身份认证协议,旨在为 Web 和移动应用提供单点登录(SSO)和身份认证功能。OIDC 主要用于确认用户身份。
OIDC 的核心概念
- ID Token:用于身份认证的 JWT 令牌,包含用户信息。
- Access Token:用于授权访问资源。
- Refresh Token:用于刷新 Access Token。
- 认证端点:OIDC 提供者的身份认证接口。
- 用户信息端点:用于获取用户信息的接口。
OIDC 认证流程
- 前端重定向到 OIDC 认证端点
- 用户登录并授权
- OIDC 认证服务器返回授权码
- 前端或后端交换授权码获取 ID Token 和 Access Token
- 后端验证 ID Token,并获取用户信息
OIDC 三种模式
- 授权码模式
- 隐式模式
- 混合模式
这里只介绍授权码模式(本人公司使用)
适用场景:前后端分离,前端使用 OIDC 协议,后端使用 JWT 鉴权。
流程:
- 前端重定向到 OIDC 认证服务器,请求授权 (response_type=code)。
- 用户登录并授权。
- OIDC 认证服务器返回一个授权码(Authorization Code) 给前端。
- 前端将授权码发送给后端(避免暴露给浏览器)。
- 后端用授权码向 OIDC 服务器请求 ID Token 和 Access Token。
- OIDC 服务器返回 ID Token 和 Access Token,后端验证并存储 Token。
- 后端返回用户信息 或提供 API 访问。
基于 OIDC 协议开发出来的服务,前端可以不用自己编写登录页面,后端也不需要编写登录功能,只需要接入到公司统一登录页面即可,而且浏览器一个已经登录过的页面存在了 cookie,此时打开其他也实现 OIDC 的平台,不需要再次登录。
前端实现
import React, { useEffect, useState } from "react";
import { UserManager } from "oidc-client";
// OIDC 配置
const oidcConfig = {
authority: "https://authing.com:7777", // OIDC 认证服务地址
client_id: "app-oidc", // OIDC 客户端 ID
redirect_uri: "http://localhost/auth/v1/callback", // OIDC 前端回调地址
response_type: "code", // 授权码模式
scope: "openid profile email", // 请求用户信息
};
const userManager = new UserManager(oidcConfig);
export default function App() {
const [user, setUser] = useState(null);
// 组件加载时获取用户信息
useEffect(() => {
fetchUserInfo();
}, []);
// 登录
const login = () => {
userManager.signinRedirect();
};
// 处理 OIDC 回调
useEffect(() => {
if (window.location.pathname === "/auth/v1/callback") {
userManager.signinRedirectCallback().then(() => {
window.history.replaceState({}, document.title, "/"); // 清除 URL
fetchUserInfo(); // 登录成功后获取用户信息
});
}
}, []);
// 退出登录
const logout = () => {
fetch("http://localhost/auth/v1/logout", {
method: "GET",
})
.then(() => {
setUser(null);
})
.catch((error) => {
console.error("退出登录失败:", error);
});
};
// 获取用户信息
const fetchUserInfo = () => {
fetch("http://localhost/auth/v1/user_info", {
method: "GET",
})
.then((response) => response.json())
.then((data) => {
setUser(data);
})
.catch((error) => {
console.error("获取用户信息失败:", error);
});
};
return (
<div>
<h1>OIDC 登录示例</h1>
{user ? (
<div>
<p>欢迎, {user.name || user.email}</p>
<button onClick={logout}>退出登录</button>
</div>
) : (
<button onClick={login}>登录</button>
)}
</div>
);
}
后端实现
import base64
import json
import requests
from flask import Flask, request, jsonify, redirect, make_response
app = Flask(__name__)
OIDC_HOST = "https://authing.com:7777" # 替换为实际的 OIDC 服务器地址
FRONTEND_URL = "/" # 前端地址
def get_oauth_token(code):
"""向 OIDC 服务器获取 access_token 和 refresh_token"""
url = f"{OIDC_HOST}/v1/oauth/token"
data = {"grant_type": "authorization_code", "code": code}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=data, headers=headers)
if response.status_code != 200:
return None, None
return response.json().get("access_token"), response.json().get("refresh_token")
def get_user_info(uid):
"""从 OIDC 服务器获取用户信息"""
url = f"{OIDC_HOST}/v1/users/{uid}?app_label=app-oidc"
response = requests.get(url)
if response.status_code != 200:
return None
return response.json()
def decode_jwt(token):
"""解码 JWT 并获取 payload"""
try:
parts = token.split(".")
if len(parts) < 2:
return None
payload_bytes = base64.urlsafe_b64decode(parts[1] + "===")
return json.loads(payload_bytes.decode("utf-8"))
except Exception:
return None
@app.route("/auth/v1/callback")
def callback():
"""处理 OIDC 登录回调,获取 token 并设置 cookie"""
code = request.args.get("code")
if not code:
return jsonify({"error": "code is required"}), 400
access_token, refresh_token = get_oauth_token(code)
if not access_token or not refresh_token:
return jsonify({"error": "failed to get tokens"}), 400
response = make_response(redirect(FRONTEND_URL))
response.set_cookie("token", access_token, max_age=60 * 60 * 24)
response.set_cookie("rftoken", refresh_token, max_age=60 * 60 * 24)
return response
@app.route("/auth/v1/user_info")
def user_info():
"""解析 token 获取用户信息"""
token = request.cookies.get("token")
if not token:
return jsonify({"error": "token is required"}), 400
payload = decode_jwt(token)
if not payload or "uid" not in payload:
return jsonify({"error": "invalid token"}), 400
user_info = get_user_info(payload["uid"])
if not user_info:
return jsonify({"error": "failed to get user info"}), 400
res = {"status":0,"data":user_info["data"], "msg":"success!" }
response = make_response(json.dumps(res, ensure_ascii=False))
response.headers["Content-Type"] = "application/json; charset=utf-8"
return response
@app.route("/auth/v1/logout")
def logout():
"""登出"""
response = make_response(redirect(FRONTEND_URL))
response.delete_cookie("token")
response.delete_cookie("rftoken")
response.delete_cookie("sso_token", domain="dreame.tech")
response.delete_cookie("sso_rftoken", domain="dreame.tech")
return response
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8777, debug=True)