So far I got this
async def download_raw_file_from_bitbucket(url: str) -> Optional[Any]:
parsed = _parse_bitbucket_url(url)
if not BITBUCKET_EMAIL or not BITBUCKET_API_TOKEN:
raise Exception("Missing Bitbucket email or API token")
api_url = (
f"https://api.bitbucket.org/2.0/repositories/"
f"{parsed['workspace']}/{parsed['repo_slug']}/src/"
f"{parsed['commit']}/{parsed['path']}"
)
async with aiohttp.ClientSession() as session:
async with session.get(
api_url,
auth=aiohttp.BasicAuth(BITBUCKET_EMAIL, BITBUCKET_API_TOKEN),
allow_redirects=True,
) as response:
if not response.ok:
raise Exception(f"{response.status} {await response.text()}")
text = await response.text()
try:
return json.loads(text)
except json.JSONDecodeError:
return text
But no matter what I try, the response is always "Token is invalid, expired, or not supported for this endpoint.". I know the token is valid, it's freshly generated and has access to everything.
The URL look like this:
https://bitbucket.org/<username>/<repo>/<folder>/<folder>/<folder>/<folder>/file.extension