Hello Migration Folks,
Over the years we would have come across this problem where project roles get duplicated post a Jira migration over to the cloud.
This generally has not been much of a problem if the migration scope is less/ if the destination cloud site is a fresh one where the problem of duplication is almost null.
And as prevention is better than cure, ensure the role names are the same between the migrating sites before migrating which should solve the problem. In case that is not possible/ you eventually get the role duplication as you messed up by not checking it prior to running the migration as me, read ahead.
I recently came across such a problem where the roles were duplicated across 800 projects. Correcting all the associated permission schemes and on the project user roles is a nightmare if you want to do it manually.
While searching for some automated option, I came across this blog by Rodolfo Bortolin, however it did not work for me and I had to rewrite the code to make it work and also added some bells and whistles.
I am sharing the script and the steps to execute it just in case there is another bewildered bloke like me trying deperately to fix it.
Note: Please test the script on a test environment before executing. The script can be further improved to include error handling. This post is just to get you jumpstarted with the script.
Its the AI era, you can quickly rework the script if you need to change some bits.
Pre-requisites
Best Practices
How to execute?
Let us take the example for which I used the script to clean it up. Here the role duplicates is "Administrator (migrated)" and the original role is Administrators.
The script is a 2 part execution. The first script cleans the permission scheme and post that, use the second script to clean up the access on the Jira spaces.
Executing the permission clean up script -
What does the script do?
It finds the permission using the source role, removes the source and associates the target role. It does it across all the scoped permissions and schemes.
PS C:\Users\...\python_scripts> py .\cleanpermissionschemes.py
Jira Cloud Permission Scheme Cleanup Utility
Enter source role name (example: Administrator (migrated)): Administrator (migrated)
Enter target role name (example: Administrator): Administrators
Enter permission keys to update, comma-separated, or ALL: ALL
Enter permission scheme IDs to process, comma-separated, or ALL: 11240
Dry run? (yes/no): yes
Post cleaning up the schemes, execute the user role membership clean up.
What does the script do?
It finds all the users/groups associated to the specific role and moves them to the target role.
PS C:\Users\...\python_scripts> py .\projectrolemembershipclean.py
Jira Cloud Project Role Cleanup Utility
Enter source role name (example: Administrator (migrated)): Administrator (migrated)
Enter target role name (example: Administrator): Administrators
Enter project keys to move users/groups in, comma-separated, or ALL: TEST
Dry run? (yes/no): yes
Caution: Just a repeat caution. Test the script before you execute it. I would recommend to execute this during the dry run phase before you start using it on production migration.
Following parameters needs to be replaced in the sciprts before executing
Script 1: Clean permission scheme
import json
import logging
import time
import requests
from requests.auth import HTTPBasicAuth
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
CLOUD_BASE_URL = "https://xxxxxxxxxxxxxx.atlassian.net"
CLOUD_EMAIL = "xxxxxxxxxxxxxx"
CLOUD_TOKEN = "xxxxxxxxxxxxxxxxxxxxxx"
AUTH = HTTPBasicAuth(CLOUD_EMAIL, CLOUD_TOKEN)
HEADERS = {
"Accept": "application/json",
"Content-Type": "application/json"
}
def request_with_retries(url, method="GET", headers=None, auth=None, data=None, max_retries=5):
for attempt in range(max_retries):
response = requests.request(method, url, headers=headers, auth=auth, data=data)
if response.status_code == 429:
wait_time = 2 ** attempt
logging.warning(f"Rate limited for {url}. Waiting {wait_time}s before retrying.")
time.sleep(wait_time)
continue
return response
logging.error(f"Max retries reached for URL: {url}")
return None
def prompt_csv_or_all(prompt_text):
value = input(prompt_text).strip()
if value.upper() == "ALL":
return None
return {item.strip() for item in value.split(",") if item.strip()}
def prompt_yes_no(prompt_text):
value = input(prompt_text).strip().lower()
return value in ("y", "yes", "true", "1")
def get_all_global_roles():
response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/role",
headers=HEADERS,
auth=AUTH
)
if not response or response.status_code != 200:
raise Exception(f"Failed to fetch global roles. Status: {response.status_code if response else 'no response'}")
data = response.json()
role_map = {}
if not isinstance(data, list):
raise Exception(f"Unexpected response from /rest/api/3/role. Expected list, got {type(data).__name__}")
for role_obj in data:
role_name = role_obj.get("name")
role_id = role_obj.get("id")
if role_name and role_id is not None:
role_map[role_name] = str(role_id)
return role_map
def get_permission_schemes():
response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/permissionscheme",
headers=HEADERS,
auth=AUTH
)
if not response or response.status_code != 200:
raise Exception(f"Failed to fetch permission schemes. Status: {response.status_code if response else 'no response'}")
return response.json().get("permissionSchemes", [])
def get_permission_scheme_details(scheme_id):
response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme_id}?expand=permissions",
headers=HEADERS,
auth=AUTH
)
if not response or response.status_code != 200:
raise Exception(
f"Failed to fetch permission scheme details for scheme {scheme_id}. "
f"Status: {response.status_code if response else 'no response'}"
)
return response.json()
def get_holder_role_id(holder):
if not holder or holder.get("type") != "projectRole":
return None
project_role = holder.get("projectRole", {})
if project_role.get("id") is not None:
return str(project_role.get("id"))
if holder.get("parameter") is not None:
return str(holder.get("parameter"))
if holder.get("value") is not None:
return str(holder.get("value"))
return None
def permission_exists_for_role(permissions, permission_key, target_role_id):
for perm in permissions:
holder = perm.get("holder", {})
holder_role_id = get_holder_role_id(holder)
if perm.get("permission") == permission_key and holder_role_id == str(target_role_id):
return True
return False
def add_permission_to_scheme(scheme_id, permission_key, target_role_id, dry_run):
payload = {
"holder": {
"type": "projectRole",
"parameter": str(target_role_id)
},
"permission": permission_key
}
if dry_run:
logging.info(
f"[DRY RUN] Would add permission '{permission_key}' to scheme {scheme_id} for target role ID {target_role_id}"
)
return True
response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme_id}/permission",
method="POST",
headers=HEADERS,
auth=AUTH,
data=json.dumps(payload)
)
if response is None:
logging.error(f"Add permission failed for scheme {scheme_id}: no response")
return False
if response.status_code not in (200, 201):
logging.error(
f"Add permission failed for scheme {scheme_id}, permission '{permission_key}', role {target_role_id}. "
f"Status: {response.status_code}, Body: {response.text}"
)
return False
return True
def delete_permission_from_scheme(scheme_id, permission_id, dry_run):
if dry_run:
logging.info(f"[DRY RUN] Would delete permission ID '{permission_id}' from scheme {scheme_id}")
return True
response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme_id}/permission/{permission_id}",
method="DELETE",
headers=HEADERS,
auth=AUTH
)
if response is None:
logging.error(f"Delete permission failed for scheme {scheme_id}, permission ID {permission_id}: no response")
return False
if response.status_code != 204:
logging.error(
f"Delete permission failed for scheme {scheme_id}, permission ID {permission_id}. "
f"Status: {response.status_code}, Body: {response.text}"
)
return False
return True
def update_permission_schemes(source_role_name, target_role_name, permissions_to_fix, scheme_ids, dry_run):
role_map = get_all_global_roles()
source_role_id = role_map.get(source_role_name)
target_role_id = role_map.get(target_role_name)
if not source_role_id:
raise Exception(f"Source role '{source_role_name}' not found in global roles")
if not target_role_id:
raise Exception(f"Target role '{target_role_name}' not found in global roles")
schemes = get_permission_schemes()
for scheme in schemes:
scheme_id = str(scheme["id"])
scheme_name = scheme["name"]
if scheme_ids and scheme_id not in scheme_ids:
continue
logging.info(f"Processing permission scheme: {scheme_name} ({scheme_id})")
details = get_permission_scheme_details(scheme_id)
permissions = details.get("permissions", [])
matching_perms = []
for perm in permissions:
holder = perm.get("holder", {})
permission_key = perm.get("permission")
permission_id = perm.get("id")
holder_role_id = get_holder_role_id(holder)
if holder.get("type") != "projectRole":
continue
if holder_role_id != str(source_role_id):
continue
if permissions_to_fix and permission_key not in permissions_to_fix:
continue
matching_perms.append({
"permission_key": permission_key,
"permission_id": permission_id
})
if not matching_perms:
logging.info(f"No matching permissions found in scheme '{scheme_name}'")
continue
for item in matching_perms:
permission_key = item["permission_key"]
permission_id = item["permission_id"]
details = get_permission_scheme_details(scheme_id)
current_permissions = details.get("permissions", [])
logging.info(
f"Found permission '{permission_key}' in scheme '{scheme_name}' using role '{source_role_name}'"
)
if not permission_exists_for_role(current_permissions, permission_key, target_role_id):
added = add_permission_to_scheme(scheme_id, permission_key, target_role_id, dry_run)
if not added:
continue
else:
logging.info(
f"Target role '{target_role_name}' already has permission '{permission_key}' in scheme '{scheme_name}'"
)
deleted = delete_permission_from_scheme(scheme_id, permission_id, dry_run)
if deleted:
logging.info(
f"Replaced role '{source_role_name}' with '{target_role_name}' for permission '{permission_key}' in scheme '{scheme_name}'"
)
else:
logging.error(
f"Failed to delete old permission '{permission_key}' from scheme '{scheme_name}'"
)
def main():
print("\nJira Cloud Permission Scheme Cleanup Utility\n")
source_role_name = input("Enter source role name (example: Administrator (migrated)): ").strip()
target_role_name = input("Enter target role name (example: Administrator): ").strip()
permissions_to_fix = prompt_csv_or_all(
"Enter permission keys to update, comma-separated, or ALL: "
)
scheme_ids = prompt_csv_or_all(
"Enter permission scheme IDs to process, comma-separated, or ALL: "
)
dry_run = prompt_yes_no("Dry run? (yes/no): ")
logging.info(f"Dry run mode: {dry_run}")
logging.info(f"Source role: {source_role_name}")
logging.info(f"Target role: {target_role_name}")
logging.info(f"Selected permissions: {permissions_to_fix if permissions_to_fix else 'ALL'}")
logging.info(f"Selected schemes: {scheme_ids if scheme_ids else 'ALL'}")
update_permission_schemes(
source_role_name=source_role_name,
target_role_name=target_role_name,
permissions_to_fix=permissions_to_fix,
scheme_ids=scheme_ids,
dry_run=dry_run
)
logging.info("Permission scheme cleanup complete")
if __name__ == "__main__":
main()
import json
import logging
import time
import requests
from requests.auth import HTTPBasicAuth
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
CLOUD_BASE_URL = "https://XXXXXXXXXXXXXX.atlassian.net"
CLOUD_EMAIL = "XXXXXXXXXXXXXX"
CLOUD_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
AUTH = HTTPBasicAuth(CLOUD_EMAIL, CLOUD_TOKEN)
HEADERS = {
"Accept": "application/json",
"Content-Type": "application/json"
}
def request_with_retries(url, method="GET", headers=None, auth=None, data=None, max_retries=5):
for attempt in range(max_retries):
response = requests.request(method, url, headers=headers, auth=auth, data=data)
if response.status_code == 429:
wait_time = 2 ** attempt
logging.warning(f"Rate limited for {url}. Waiting {wait_time}s before retrying.")
time.sleep(wait_time)
continue
return response
logging.error(f"Max retries reached for URL: {url}")
return None
def prompt_csv_or_all(prompt_text):
value = input(prompt_text).strip()
if value.upper() == "ALL":
return None
return {item.strip() for item in value.split(",") if item.strip()}
def prompt_yes_no(prompt_text):
value = input(prompt_text).strip().lower()
return value in ("y", "yes", "true", "1")
def get_projects():
projects = []
start_at = 0
max_results = 50
while True:
response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/project/search?startAt={start_at}&maxResults={max_results}",
headers=HEADERS,
auth=AUTH
)
if not response or response.status_code != 200:
raise Exception(f"Failed to fetch projects. Status: {response.status_code if response else 'no response'}")
data = response.json()
values = data.get("values", [])
projects.extend(values)
if start_at + max_results >= data.get("total", 0):
break
start_at += max_results
return projects
def move_project_role_actors(source_role_name, target_role_name, selected_projects, dry_run):
projects = get_projects()
for project in projects:
project_id = str(project["id"])
project_key = project["key"]
project_name = project["name"]
if selected_projects and project_key not in selected_projects:
continue
logging.info(f"Processing project role actors in project: {project_key} - {project_name}")
roles_response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/project/{project_id}/role",
headers=HEADERS,
auth=AUTH
)
if not roles_response or roles_response.status_code != 200:
logging.warning(f"Failed to fetch roles for project {project_key}")
continue
role_urls = roles_response.json()
source_role_id = None
target_role_id = None
for role_name, role_url in role_urls.items():
role_id = role_url.rstrip("/").split("/")[-1]
if role_name == source_role_name:
source_role_id = role_id
if role_name == target_role_name:
target_role_id = role_id
if not source_role_id:
logging.info(f"Source role '{source_role_name}' not found in project {project_key}")
continue
if not target_role_id:
logging.warning(f"Target role '{target_role_name}' not found in project {project_key}")
continue
source_role_detail = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/project/{project_id}/role/{source_role_id}",
headers=HEADERS,
auth=AUTH
)
if not source_role_detail or source_role_detail.status_code != 200:
logging.warning(f"Failed to fetch source role detail in project {project_key}")
continue
actors = source_role_detail.json().get("actors", [])
if not actors:
logging.info(f"No actors found in source role '{source_role_name}' for project {project_key}")
continue
for actor in actors:
if actor["type"] == "atlassian-group-role-actor":
add_payload = {"groupId": [actor["actorGroup"]["groupId"]]}
delete_query = f"groupId={actor['actorGroup']['groupId']}"
actor_desc = f"groupId={actor['actorGroup']['groupId']}"
elif actor["type"] == "atlassian-user-role-actor":
add_payload = {"user": [actor["actorUser"]["accountId"]]}
delete_query = f"user={actor['actorUser']['accountId']}"
actor_desc = f"user={actor['actorUser']['accountId']}"
else:
logging.info(f"Skipping unsupported actor type '{actor['type']}' in project {project_key}")
continue
if dry_run:
logging.info(f"[DRY RUN] Would add {actor_desc} to role '{target_role_name}' in project {project_key}")
logging.info(f"[DRY RUN] Would remove {actor_desc} from role '{source_role_name}' in project {project_key}")
continue
add_response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/project/{project_id}/role/{target_role_id}",
method="POST",
headers=HEADERS,
auth=AUTH,
data=json.dumps(add_payload)
)
if not add_response or add_response.status_code not in (200, 201):
logging.error(f"Failed to add {actor_desc} to target role in project {project_key}")
continue
delete_response = request_with_retries(
f"{CLOUD_BASE_URL}/rest/api/3/project/{project_id}/role/{source_role_id}?{delete_query}",
method="DELETE",
headers=HEADERS,
auth=AUTH
)
if not delete_response or delete_response.status_code != 204:
logging.error(f"Failed to remove {actor_desc} from source role in project {project_key}")
else:
logging.info(f"Moved {actor_desc} from '{source_role_name}' to '{target_role_name}' in project {project_key}")
def main():
print("\nJira Cloud Project Role Cleanup Utility\n")
source_role_name = input("Enter source role name (example: Administrator (migrated)): ").strip()
target_role_name = input("Enter target role name (example: Administrator): ").strip()
selected_projects = prompt_csv_or_all(
"Enter project keys to move users/groups in, comma-separated, or ALL: "
)
dry_run = prompt_yes_no("Dry run? (yes/no): ")
logging.info(f"Dry run mode: {dry_run}")
logging.info(f"Source role: {source_role_name}")
logging.info(f"Target role: {target_role_name}")
logging.info(f"Selected projects: {selected_projects if selected_projects else 'ALL'}")
move_project_role_actors(
source_role_name=source_role_name,
target_role_name=target_role_name,
selected_projects=selected_projects,
dry_run=dry_run
)
logging.info("Project role cleanup complete")
if __name__ == "__main__":
main()
Vinubabu _Vinu_
0 comments