简单使用 OIDC 协议实现单点登录
OIDC(OpenID Connect)是建立在 OAuth 2.0 之上的身份认证协议,旨在为 Web 和移动应用提供单点登录(SSO)和身份认证功能。OIDC 主要用于确认用户身份。
OIDC 的核心概念
- ID Token:用于身份认证的 JWT 令牌,包含用户信息。
 - Access Token:用于授权访问资源。
 - Refresh Token:用于刷新 Access Token。
 - 认证端点:OIDC 提供者的身份认证接口。
 - 用户信息端点:用于获取用户信息的接口。
 
OIDC 认证流程
- 前端重定向到 OIDC 认证端点
 - 用户登录并授权
 - OIDC 认证服务器返回授权码
 - 前端或后端交换授权码获取 ID Token 和 Access Token
 - 后端验证 ID Token,并获取用户信息
 
OIDC 三种模式
- 授权码模式
 - 隐式模式
 - 混合模式
 
这里只介绍授权码模式(本人公司使用)
适用场景:前后端分离,前端使用 OIDC 协议,后端使用 JWT 鉴权。
流程:
- 前端重定向到 OIDC 认证服务器,请求授权 (response_type=code)。
 - 用户登录并授权。
 - OIDC 认证服务器返回一个授权码(Authorization Code) 给前端。
 - 前端将授权码发送给后端(避免暴露给浏览器)。
 - 后端用授权码向 OIDC 服务器请求 ID Token 和 Access Token。
 - OIDC 服务器返回 ID Token 和 Access Token,后端验证并存储 Token。
 - 后端返回用户信息 或提供 API 访问。
 
基于 OIDC 协议开发出来的服务,前端可以不用自己编写登录页面,后端也不需要编写登录功能,只需要接入到公司统一登录页面即可,而且浏览器一个已经登录过的页面存在了 cookie,此时打开其他也实现 OIDC 的平台,不需要再次登录。
前端实现
import React, { useEffect, useState } from "react";
import { UserManager } from "oidc-client";
// OIDC 配置
const oidcConfig = {
    authority: "https://authing.com:7777",             // OIDC 认证服务地址
    client_id: "app-oidc",                             // OIDC 客户端 ID 
    redirect_uri: "http://localhost/auth/v1/callback", // OIDC 前端回调地址
    response_type: "code",                             // 授权码模式
    scope: "openid profile email",                     // 请求用户信息
};
const userManager = new UserManager(oidcConfig);
export default function App() {
    const [user, setUser] = useState(null);
    // 组件加载时获取用户信息
    useEffect(() => {
        fetchUserInfo();
    }, []);
    // 登录
    const login = () => {
        userManager.signinRedirect();
    };
    // 处理 OIDC 回调
    useEffect(() => {
        if (window.location.pathname === "/auth/v1/callback") {
            userManager.signinRedirectCallback().then(() => {
                window.history.replaceState({}, document.title, "/"); // 清除 URL
                fetchUserInfo(); // 登录成功后获取用户信息
            });
        }
    }, []);
    // 退出登录
    const logout = () => {
        fetch("http://localhost/auth/v1/logout", {
            method: "GET",
        })
            .then(() => {
                setUser(null);
            })
            .catch((error) => {
                console.error("退出登录失败:", error);
            });
    };
    // 获取用户信息
    const fetchUserInfo = () => {
        fetch("http://localhost/auth/v1/user_info", {
            method: "GET",
        })
            .then((response) => response.json())
            .then((data) => {
                setUser(data);
            })
            .catch((error) => {
                console.error("获取用户信息失败:", error);
            });
    };
    return (
        <div>
            <h1>OIDC 登录示例</h1>
            {user ? (
                <div>
                    <p>欢迎, {user.name || user.email}</p>
                    <button onClick={logout}>退出登录</button>
                </div>
            ) : (
                <button onClick={login}>登录</button>
            )}
        </div>
    );
}
后端实现
import base64
import json
import requests
from flask import Flask, request, jsonify, redirect, make_response
app = Flask(__name__)
OIDC_HOST = "https://authing.com:7777"  # 替换为实际的 OIDC 服务器地址
FRONTEND_URL = "/"  # 前端地址
def get_oauth_token(code):
    """向 OIDC 服务器获取 access_token 和 refresh_token"""
    url = f"{OIDC_HOST}/v1/oauth/token"
    data = {"grant_type": "authorization_code", "code": code}
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    response = requests.post(url, data=data, headers=headers)
    if response.status_code != 200:
        return None, None
    return response.json().get("access_token"), response.json().get("refresh_token")
def get_user_info(uid):
    """从 OIDC 服务器获取用户信息"""
    url = f"{OIDC_HOST}/v1/users/{uid}?app_label=app-oidc"
    response = requests.get(url)
    if response.status_code != 200:
        return None
    return response.json()
def decode_jwt(token):
    """解码 JWT 并获取 payload"""
    try:
        parts = token.split(".")
        if len(parts) < 2:
            return None
        payload_bytes = base64.urlsafe_b64decode(parts[1] + "===")
        return json.loads(payload_bytes.decode("utf-8"))
    except Exception:
        return None
@app.route("/auth/v1/callback")
def callback():
    """处理 OIDC 登录回调,获取 token 并设置 cookie"""
    code = request.args.get("code")
    if not code:
        return jsonify({"error": "code is required"}), 400
    access_token, refresh_token = get_oauth_token(code)
    if not access_token or not refresh_token:
        return jsonify({"error": "failed to get tokens"}), 400
    response = make_response(redirect(FRONTEND_URL))
    response.set_cookie("token", access_token, max_age=60 * 60 * 24)
    response.set_cookie("rftoken", refresh_token, max_age=60 * 60 * 24)
    return response
@app.route("/auth/v1/user_info")
def user_info():
    """解析 token 获取用户信息"""
    token = request.cookies.get("token")
    if not token:
        return jsonify({"error": "token is required"}), 400
    payload = decode_jwt(token)
    if not payload or "uid" not in payload:
        return jsonify({"error": "invalid token"}), 400
    user_info = get_user_info(payload["uid"])
    if not user_info:
        return jsonify({"error": "failed to get user info"}), 400
    res = {"status":0,"data":user_info["data"], "msg":"success!" }
    response = make_response(json.dumps(res, ensure_ascii=False))
    response.headers["Content-Type"] = "application/json; charset=utf-8"
    return response
@app.route("/auth/v1/logout")
def logout():
    """登出"""
    response = make_response(redirect(FRONTEND_URL))
    response.delete_cookie("token")
    response.delete_cookie("rftoken")
    response.delete_cookie("sso_token", domain="dreame.tech")
    response.delete_cookie("sso_rftoken", domain="dreame.tech")
    return response
if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8777, debug=True)