im trying to create a chatbot where multiple users can query thier spaces,for that i haved oauth to get access token for each user though which i can query using cql
but im getting this error while searching
this is my code
from fastapi import FastAPI, HTTPException
import requests
app = FastAPI()
# OAuth credentials
CLIENT_ID = "XWoOJPPwkFsdvXLp5ZA"
CLIENT_SECRET = "ATOACRoruoGA2E_S9FSKmLbuHrLp2EcT640EE26CA"
STATIC_SPACE_KEY = "REDESIGN"
def get_access_token(code: str):
"""
Exchange the authorization code for an access token.
"""
payload = {
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"code": code,
"redirect_uri": REDIRECT_URI,
}
response = requests.post(token_url, json=payload)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.json())
return response.json()["access_token"]
def get_cloud_id(access_token: str):
"""
Retrieve the cloud ID for the user.
"""
headers = {"Authorization": f"Bearer {access_token}"}
response = requests.get(url, headers=headers)
if response.status_code != 200 or not response.json():
raise HTTPException(status_code=404, detail="Unable to retrieve cloud ID.")
return response.json()[0]["id"]
def query_pages(access_token: str, cloud_id: str):
"""
Query all pages under the static space 'REDESIGN'.
"""
cql = f"type=page AND space='{STATIC_SPACE_KEY}'"
headers = {
"Authorization": f"Bearer {access_token}",
"Accept": "application/json",
}
params = {"cql": cql, "expand": "body.storage"}
response = requests.get(url, headers=headers, params=params)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail=response.json())
return [
{
"title": result["title"],
"id": result["id"],
"content": result.get("body", {}).get("storage", {}).get("value", "No Content"),
}
for result in response.json().get("results", [])
]
@app.get("/callback")
def callback(code: str):
"""
Callback endpoint to handle OAuth redirect, fetch access token, and query pages.
"""
# Step 1: Exchange the authorization code for an access token
access_token = get_access_token(code)
print("access token = ",access_token)
# Step 2: Retrieve the cloud ID
cloud_id = get_cloud_id(access_token)
print("cloud id = ",cloud_id)
# Step 3: Query pages under the static space 'REDESIGN'
pages = query_pages(access_token, cloud_id)
# Step 4: Return the queried pages
return {"space": STATIC_SPACE_KEY, "pages": pages}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.