简单使用 OIDC 协议实现单点登录

Administrator
发布于 2025-03-25 / 5 阅读 / 0 评论 / 0 点赞

简单使用 OIDC 协议实现单点登录

简单使用 OIDC 协议实现单点登录

OIDC(OpenID Connect)是建立在 OAuth 2.0 之上的身份认证协议,旨在为 Web 和移动应用提供单点登录(SSO)和身份认证功能。OIDC 主要用于确认用户身份。

OIDC 的核心概念

  • ID Token:用于身份认证的 JWT 令牌,包含用户信息。
  • Access Token:用于授权访问资源。
  • Refresh Token:用于刷新 Access Token。
  • 认证端点:OIDC 提供者的身份认证接口。
  • 用户信息端点:用于获取用户信息的接口。

OIDC 认证流程

  • 前端重定向到 OIDC 认证端点
  • 用户登录并授权
  • OIDC 认证服务器返回授权码
  • 前端或后端交换授权码获取 ID Token 和 Access Token
  • 后端验证 ID Token,并获取用户信息

OIDC 三种模式

  • 授权码模式
  • 隐式模式
  • 混合模式

这里只介绍授权码模式(本人公司使用)
适用场景:前后端分离,前端使用 OIDC 协议,后端使用 JWT 鉴权。

流程:

  • 前端重定向到 OIDC 认证服务器,请求授权 (response_type=code)。
  • 用户登录并授权。
  • OIDC 认证服务器返回一个授权码(Authorization Code) 给前端。
  • 前端将授权码发送给后端(避免暴露给浏览器)。
  • 后端用授权码向 OIDC 服务器请求 ID Token 和 Access Token。
  • OIDC 服务器返回 ID Token 和 Access Token,后端验证并存储 Token。
  • 后端返回用户信息 或提供 API 访问。

基于 OIDC 协议开发出来的服务,前端可以不用自己编写登录页面,后端也不需要编写登录功能,只需要接入到公司统一登录页面即可,而且浏览器一个已经登录过的页面存在了 cookie,此时打开其他也实现 OIDC 的平台,不需要再次登录。

前端实现

import React, { useEffect, useState } from "react";
import { UserManager } from "oidc-client";

// OIDC 配置
const oidcConfig = {
    authority: "https://authing.com:7777",             // OIDC 认证服务地址
    client_id: "app-oidc",                             // OIDC 客户端 ID 
    redirect_uri: "http://localhost/auth/v1/callback", // OIDC 前端回调地址
    response_type: "code",                             // 授权码模式
    scope: "openid profile email",                     // 请求用户信息
};

const userManager = new UserManager(oidcConfig);

export default function App() {
    const [user, setUser] = useState(null);

    // 组件加载时获取用户信息
    useEffect(() => {
        fetchUserInfo();
    }, []);

    // 登录
    const login = () => {
        userManager.signinRedirect();
    };

    // 处理 OIDC 回调
    useEffect(() => {
        if (window.location.pathname === "/auth/v1/callback") {
            userManager.signinRedirectCallback().then(() => {
                window.history.replaceState({}, document.title, "/"); // 清除 URL
                fetchUserInfo(); // 登录成功后获取用户信息
            });
        }
    }, []);

    // 退出登录
    const logout = () => {
        fetch("http://localhost/auth/v1/logout", {
            method: "GET",
        })
            .then(() => {
                setUser(null);
            })
            .catch((error) => {
                console.error("退出登录失败:", error);
            });
    };

    // 获取用户信息
    const fetchUserInfo = () => {
        fetch("http://localhost/auth/v1/user_info", {
            method: "GET",
        })
            .then((response) => response.json())
            .then((data) => {
                setUser(data);
            })
            .catch((error) => {
                console.error("获取用户信息失败:", error);
            });
    };

    return (
        <div>
            <h1>OIDC 登录示例</h1>
            {user ? (
                <div>
                    <p>欢迎, {user.name || user.email}</p>
                    <button onClick={logout}>退出登录</button>
                </div>
            ) : (
                <button onClick={login}>登录</button>
            )}
        </div>
    );
}

后端实现

import base64
import json
import requests
from flask import Flask, request, jsonify, redirect, make_response

app = Flask(__name__)

OIDC_HOST = "https://authing.com:7777"  # 替换为实际的 OIDC 服务器地址
FRONTEND_URL = "/"  # 前端地址


def get_oauth_token(code):
    """向 OIDC 服务器获取 access_token 和 refresh_token"""
    url = f"{OIDC_HOST}/v1/oauth/token"
    data = {"grant_type": "authorization_code", "code": code}
    headers = {"Content-Type": "application/x-www-form-urlencoded"}

    response = requests.post(url, data=data, headers=headers)
    if response.status_code != 200:
        return None, None
    return response.json().get("access_token"), response.json().get("refresh_token")


def get_user_info(uid):
    """从 OIDC 服务器获取用户信息"""
    url = f"{OIDC_HOST}/v1/users/{uid}?app_label=app-oidc"
    response = requests.get(url)
    if response.status_code != 200:
        return None
    return response.json()


def decode_jwt(token):
    """解码 JWT 并获取 payload"""
    try:
        parts = token.split(".")
        if len(parts) < 2:
            return None
        payload_bytes = base64.urlsafe_b64decode(parts[1] + "===")
        return json.loads(payload_bytes.decode("utf-8"))
    except Exception:
        return None


@app.route("/auth/v1/callback")
def callback():
    """处理 OIDC 登录回调,获取 token 并设置 cookie"""
    code = request.args.get("code")
    if not code:
        return jsonify({"error": "code is required"}), 400

    access_token, refresh_token = get_oauth_token(code)
    if not access_token or not refresh_token:
        return jsonify({"error": "failed to get tokens"}), 400

    response = make_response(redirect(FRONTEND_URL))
    response.set_cookie("token", access_token, max_age=60 * 60 * 24)
    response.set_cookie("rftoken", refresh_token, max_age=60 * 60 * 24)
    return response


@app.route("/auth/v1/user_info")
def user_info():
    """解析 token 获取用户信息"""
    token = request.cookies.get("token")
    if not token:
        return jsonify({"error": "token is required"}), 400

    payload = decode_jwt(token)
    if not payload or "uid" not in payload:
        return jsonify({"error": "invalid token"}), 400

    user_info = get_user_info(payload["uid"])
    if not user_info:
        return jsonify({"error": "failed to get user info"}), 400

    res = {"status":0,"data":user_info["data"], "msg":"success!" }

    response = make_response(json.dumps(res, ensure_ascii=False))
    response.headers["Content-Type"] = "application/json; charset=utf-8"

    return response

@app.route("/auth/v1/logout")
def logout():
    """登出"""
    response = make_response(redirect(FRONTEND_URL))
    response.delete_cookie("token")
    response.delete_cookie("rftoken")
    response.delete_cookie("sso_token", domain="dreame.tech")
    response.delete_cookie("sso_rftoken", domain="dreame.tech")
    return response

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8777, debug=True)