Forums

Articles
Create
cancel
Showing results for 
Search instead for 
Did you mean: 

Clean up duplicated project roles post migration in Jira cloud

Hello Migration Folks,

Over the years we would have come across this problem where project roles get duplicated post a Jira migration over to the cloud.

This generally has not been much of a problem if the migration scope is less/ if the destination cloud site is a fresh one where the problem of duplication is almost null.

And as prevention is better than cure, ensure the role names are the same between the migrating sites before migrating which should solve the problem. In case that is not possible/ you eventually get the role duplication as you messed up by not checking it prior to running the migration as me, read ahead.

I recently came across such a problem where the roles were duplicated across 800 projects. Correcting all the associated permission schemes and on the project user roles is a nightmare if you want to do it manually.

While searching for some automated option, I came across this blog by Rodolfo Bortolin, however it did not work for me and I had to rewrite the code to make it work and also added some bells and whistles.

I am sharing the script and the steps to execute it just in case there is another bewildered bloke like me trying deperately to fix it. 

Note: Please test the script on a test environment before executing. The script can be further improved to include error handling. This post is just to get you jumpstarted with the script.

Its the AI era, you can quickly rework the script if you need to change some bits.

Pre-requisites

  • Confirm you have Jira admin permissions and API token from the admin account.
  • Identify the affected projects and permission schemes before running the script. Go to the project roles section in Jira admin settings and identify the duplicated roles and schemes.

Best Practices

  • Set the script to dry-run mode first so it only logs intended actions without making changes.
  • Run the script against one test project/ scheme to verify the role matching logic correctly detects migrated roles.
  • Once you have tested the script repeatedly on test environment and on dry run mode, limit the scope and run it to clean up the roles.

How to execute?
Let us take the example for which I used the script to clean it up. Here the role duplicates is "Administrator (migrated)" and the original role is Administrators.

The script is a 2 part execution. The first script cleans the permission scheme and post that, use the second script to clean up the access on the Jira spaces.

Executing the permission clean up script -

  • Source role name - the role which needs to be corrected
  • Target role name - the role which is the correct role that should replace the duplicated role.
  • Permission keys to update - The actual permission keys where you want to duplicates to be fixed in your scheme. (I just choose All)
  • Permission scheme IDs to process - The actual scheme IDs on which the script should take action of correcting the roles.
  • Dry Run? - If the current run is a dry run or not.

What does the script do?

It finds the permission using the source role, removes the source and associates the target role. It does it across all the scoped permissions and schemes.

PS C:\Users\...\python_scripts> py .\cleanpermissionschemes.py

Jira Cloud Permission Scheme Cleanup Utility

Enter source role name (example: Administrator (migrated)): Administrator (migrated)
Enter target role name (example: Administrator): Administrators
Enter permission keys to update, comma-separated, or ALL: ALL
Enter permission scheme IDs to process, comma-separated, or ALL: 11240
Dry run? (yes/no): yes

Post cleaning up the schemes, execute the user role membership clean up.

  • Source role name - the role which needs to be corrected
  • Target role name - the role which is the correct role that should replace the duplicated role.
  • Project keys - The projects on which the script should run.
  • Dry Run? - If the current run is a dry run or not.

What does the script do?

It finds all the users/groups associated to the specific role and moves them to the target role.

PS C:\Users\...\python_scripts> py .\projectrolemembershipclean.py

Jira Cloud Project Role Cleanup Utility

Enter source role name (example: Administrator (migrated)): Administrator (migrated)
Enter target role name (example: Administrator): Administrators
Enter project keys to move users/groups in, comma-separated, or ALL: TEST
Dry run? (yes/no): yes

Caution: Just a repeat caution. Test the script before you execute it. I would recommend to execute this during the dry run phase before you start using it on production migration.

Following parameters needs to be replaced in the sciprts before executing

  • Jira Admin API token,
  • URL and
  • Email address of the admin account

Script 1: Clean permission scheme

import json

import logging

import time

import requests

from requests.auth import HTTPBasicAuth

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

CLOUD_BASE_URL = "https://xxxxxxxxxxxxxx.atlassian.net"

CLOUD_EMAIL = "xxxxxxxxxxxxxx"

CLOUD_TOKEN = "xxxxxxxxxxxxxxxxxxxxxx"

AUTH = HTTPBasicAuth(CLOUD_EMAIL, CLOUD_TOKEN)

HEADERS = {

    "Accept": "application/json",

    "Content-Type": "application/json"

}

def request_with_retries(url, method="GET", headers=None, auth=None, data=None, max_retries=5):

    for attempt in range(max_retries):

        response = requests.request(method, url, headers=headers, auth=auth, data=data)

        if response.status_code == 429:

            wait_time = 2 ** attempt

            logging.warning(f"Rate limited for {url}. Waiting {wait_time}s before retrying.")

            time.sleep(wait_time)

            continue

        return response

    logging.error(f"Max retries reached for URL: {url}")

    return None

def prompt_csv_or_all(prompt_text):

    value = input(prompt_text).strip()

    if value.upper() == "ALL":

        return None

    return {item.strip() for item in value.split(",") if item.strip()}

def prompt_yes_no(prompt_text):

    value = input(prompt_text).strip().lower()

    return value in ("y", "yes", "true", "1")

def get_all_global_roles():

    response = request_with_retries(

        f"{CLOUD_BASE_URL}/rest/api/3/role",

        headers=HEADERS,

        auth=AUTH

    )

    if not response or response.status_code != 200:

        raise Exception(f"Failed to fetch global roles. Status: {response.status_code if response else 'no response'}")

    data = response.json()

    role_map = {}

    if not isinstance(data, list):

        raise Exception(f"Unexpected response from /rest/api/3/role. Expected list, got {type(data).__name__}")

    for role_obj in data:

        role_name = role_obj.get("name")

        role_id = role_obj.get("id")

        if role_name and role_id is not None:

            role_map[role_name] = str(role_id)

    return role_map

def get_permission_schemes():

    response = request_with_retries(

        f"{CLOUD_BASE_URL}/rest/api/3/permissionscheme",

        headers=HEADERS,

        auth=AUTH

    )

    if not response or response.status_code != 200:

        raise Exception(f"Failed to fetch permission schemes. Status: {response.status_code if response else 'no response'}")

    return response.json().get("permissionSchemes", [])

def get_permission_scheme_details(scheme_id):

    response = request_with_retries(

        f"{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme_id}?expand=permissions",

        headers=HEADERS,

        auth=AUTH

    )

    if not response or response.status_code != 200:

        raise Exception(

            f"Failed to fetch permission scheme details for scheme {scheme_id}. "

            f"Status: {response.status_code if response else 'no response'}"

        )

    return response.json()

def get_holder_role_id(holder):

    if not holder or holder.get("type") != "projectRole":

        return None

    project_role = holder.get("projectRole", {})

    if project_role.get("id") is not None:

        return str(project_role.get("id"))

    if holder.get("parameter") is not None:

        return str(holder.get("parameter"))

    if holder.get("value") is not None:

        return str(holder.get("value"))

    return None

def permission_exists_for_role(permissions, permission_key, target_role_id):

    for perm in permissions:

        holder = perm.get("holder", {})

        holder_role_id = get_holder_role_id(holder)

        if perm.get("permission") == permission_key and holder_role_id == str(target_role_id):

            return True

    return False

def add_permission_to_scheme(scheme_id, permission_key, target_role_id, dry_run):

    payload = {

        "holder": {

            "type": "projectRole",

            "parameter": str(target_role_id)

        },

        "permission": permission_key

    }

    if dry_run:

        logging.info(

            f"[DRY RUN] Would add permission '{permission_key}' to scheme {scheme_id} for target role ID {target_role_id}"

        )

        return True

    response = request_with_retries(

        f"{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme_id}/permission",

        method="POST",

        headers=HEADERS,

        auth=AUTH,

        data=json.dumps(payload)

    )

    if response is None:

        logging.error(f"Add permission failed for scheme {scheme_id}: no response")

        return False

    if response.status_code not in (200, 201):

        logging.error(

            f"Add permission failed for scheme {scheme_id}, permission '{permission_key}', role {target_role_id}. "

            f"Status: {response.status_code}, Body: {response.text}"

        )

        return False

    return True

def delete_permission_from_scheme(scheme_id, permission_id, dry_run):

    if dry_run:

        logging.info(f"[DRY RUN] Would delete permission ID '{permission_id}' from scheme {scheme_id}")

        return True

    response = request_with_retries(

        f"{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme_id}/permission/{permission_id}",

        method="DELETE",

        headers=HEADERS,

        auth=AUTH

    )

    if response is None:

        logging.error(f"Delete permission failed for scheme {scheme_id}, permission ID {permission_id}: no response")

        return False

    if response.status_code != 204:

        logging.error(

            f"Delete permission failed for scheme {scheme_id}, permission ID {permission_id}. "

            f"Status: {response.status_code}, Body: {response.text}"

        )

        return False

    return True

def update_permission_schemes(source_role_name, target_role_name, permissions_to_fix, scheme_ids, dry_run):

    role_map = get_all_global_roles()

    source_role_id = role_map.get(source_role_name)

    target_role_id = role_map.get(target_role_name)

    if not source_role_id:

        raise Exception(f"Source role '{source_role_name}' not found in global roles")

    if not target_role_id:

        raise Exception(f"Target role '{target_role_name}' not found in global roles")

    schemes = get_permission_schemes()

    for scheme in schemes:

        scheme_id = str(scheme["id"])

        scheme_name = scheme["name"]

        if scheme_ids and scheme_id not in scheme_ids:

            continue

        logging.info(f"Processing permission scheme: {scheme_name} ({scheme_id})")

        details = get_permission_scheme_details(scheme_id)

        permissions = details.get("permissions", [])

        matching_perms = []

        for perm in permissions:

            holder = perm.get("holder", {})

            permission_key = perm.get("permission")

            permission_id = perm.get("id")

            holder_role_id = get_holder_role_id(holder)

            if holder.get("type") != "projectRole":

                continue

            if holder_role_id != str(source_role_id):

                continue

            if permissions_to_fix and permission_key not in permissions_to_fix:

                continue

            matching_perms.append({

                "permission_key": permission_key,

                "permission_id": permission_id

            })

        if not matching_perms:

            logging.info(f"No matching permissions found in scheme '{scheme_name}'")

            continue

        for item in matching_perms:

            permission_key = item["permission_key"]

            permission_id = item["permission_id"]

            details = get_permission_scheme_details(scheme_id)

            current_permissions = details.get("permissions", [])

            logging.info(

                f"Found permission '{permission_key}' in scheme '{scheme_name}' using role '{source_role_name}'"

            )

            if not permission_exists_for_role(current_permissions, permission_key, target_role_id):

                added = add_permission_to_scheme(scheme_id, permission_key, target_role_id, dry_run)

                if not added:

                    continue

            else:

                logging.info(

                    f"Target role '{target_role_name}' already has permission '{permission_key}' in scheme '{scheme_name}'"

                )

            deleted = delete_permission_from_scheme(scheme_id, permission_id, dry_run)

            if deleted:

                logging.info(

                    f"Replaced role '{source_role_name}' with '{target_role_name}' for permission '{permission_key}' in scheme '{scheme_name}'"

                )

            else:

                logging.error(

                    f"Failed to delete old permission '{permission_key}' from scheme '{scheme_name}'"

                )

def main():

    print("\nJira Cloud Permission Scheme Cleanup Utility\n")

    source_role_name = input("Enter source role name (example: Administrator (migrated)): ").strip()

    target_role_name = input("Enter target role name (example: Administrator): ").strip()

    permissions_to_fix = prompt_csv_or_all(

        "Enter permission keys to update, comma-separated, or ALL: "

    )

    scheme_ids = prompt_csv_or_all(

        "Enter permission scheme IDs to process, comma-separated, or ALL: "

    )

    dry_run = prompt_yes_no("Dry run? (yes/no): ")

    logging.info(f"Dry run mode: {dry_run}")

    logging.info(f"Source role: {source_role_name}")

    logging.info(f"Target role: {target_role_name}")

    logging.info(f"Selected permissions: {permissions_to_fix if permissions_to_fix else 'ALL'}")

    logging.info(f"Selected schemes: {scheme_ids if scheme_ids else 'ALL'}")

    update_permission_schemes(

        source_role_name=source_role_name,

        target_role_name=target_role_name,

        permissions_to_fix=permissions_to_fix,

        scheme_ids=scheme_ids,

        dry_run=dry_run

    )

    logging.info("Permission scheme cleanup complete")

if __name__ == "__main__":

    main()
Script 2: Clean duplicated roles from the projects
import json

import logging

import time

import requests

from requests.auth import HTTPBasicAuth

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

CLOUD_BASE_URL = "https://XXXXXXXXXXXXXX.atlassian.net"

CLOUD_EMAIL = "XXXXXXXXXXXXXX"

CLOUD_TOKEN = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

AUTH = HTTPBasicAuth(CLOUD_EMAIL, CLOUD_TOKEN)

HEADERS = {

    "Accept": "application/json",

    "Content-Type": "application/json"

}

def request_with_retries(url, method="GET", headers=None, auth=None, data=None, max_retries=5):

    for attempt in range(max_retries):

        response = requests.request(method, url, headers=headers, auth=auth, data=data)

        if response.status_code == 429:

            wait_time = 2 ** attempt

            logging.warning(f"Rate limited for {url}. Waiting {wait_time}s before retrying.")

            time.sleep(wait_time)

            continue

        return response

    logging.error(f"Max retries reached for URL: {url}")

    return None

def prompt_csv_or_all(prompt_text):

    value = input(prompt_text).strip()

    if value.upper() == "ALL":

        return None

    return {item.strip() for item in value.split(",") if item.strip()}

def prompt_yes_no(prompt_text):

    value = input(prompt_text).strip().lower()

    return value in ("y", "yes", "true", "1")

def get_projects():

    projects = []

    start_at = 0

    max_results = 50

    while True:

        response = request_with_retries(

            f"{CLOUD_BASE_URL}/rest/api/3/project/search?startAt={start_at}&maxResults={max_results}",

            headers=HEADERS,

            auth=AUTH

        )

        if not response or response.status_code != 200:

            raise Exception(f"Failed to fetch projects. Status: {response.status_code if response else 'no response'}")

        data = response.json()

        values = data.get("values", [])

        projects.extend(values)

        if start_at + max_results >= data.get("total", 0):

            break

        start_at += max_results

    return projects

def move_project_role_actors(source_role_name, target_role_name, selected_projects, dry_run):

    projects = get_projects()

    for project in projects:

        project_id = str(project["id"])

        project_key = project["key"]

        project_name = project["name"]

        if selected_projects and project_key not in selected_projects:

            continue

        logging.info(f"Processing project role actors in project: {project_key} - {project_name}")

        roles_response = request_with_retries(

            f"{CLOUD_BASE_URL}/rest/api/3/project/{project_id}/role",

            headers=HEADERS,

            auth=AUTH

        )

        if not roles_response or roles_response.status_code != 200:

            logging.warning(f"Failed to fetch roles for project {project_key}")

            continue

        role_urls = roles_response.json()

        source_role_id = None

        target_role_id = None

        for role_name, role_url in role_urls.items():

            role_id = role_url.rstrip("/").split("/")[-1]

            if role_name == source_role_name:

                source_role_id = role_id

            if role_name == target_role_name:

                target_role_id = role_id

        if not source_role_id:

            logging.info(f"Source role '{source_role_name}' not found in project {project_key}")

            continue

        if not target_role_id:

            logging.warning(f"Target role '{target_role_name}' not found in project {project_key}")

            continue

        source_role_detail = request_with_retries(

            f"{CLOUD_BASE_URL}/rest/api/3/project/{project_id}/role/{source_role_id}",

            headers=HEADERS,

            auth=AUTH

        )

        if not source_role_detail or source_role_detail.status_code != 200:

            logging.warning(f"Failed to fetch source role detail in project {project_key}")

            continue

        actors = source_role_detail.json().get("actors", [])

        if not actors:

            logging.info(f"No actors found in source role '{source_role_name}' for project {project_key}")

            continue

        for actor in actors:

            if actor["type"] == "atlassian-group-role-actor":

                add_payload = {"groupId": [actor["actorGroup"]["groupId"]]}

                delete_query = f"groupId={actor['actorGroup']['groupId']}"

                actor_desc = f"groupId={actor['actorGroup']['groupId']}"

            elif actor["type"] == "atlassian-user-role-actor":

                add_payload = {"user": [actor["actorUser"]["accountId"]]}

                delete_query = f"user={actor['actorUser']['accountId']}"

                actor_desc = f"user={actor['actorUser']['accountId']}"

            else:

                logging.info(f"Skipping unsupported actor type '{actor['type']}' in project {project_key}")

                continue

            if dry_run:

                logging.info(f"[DRY RUN] Would add {actor_desc} to role '{target_role_name}' in project {project_key}")

                logging.info(f"[DRY RUN] Would remove {actor_desc} from role '{source_role_name}' in project {project_key}")

                continue

            add_response = request_with_retries(

                f"{CLOUD_BASE_URL}/rest/api/3/project/{project_id}/role/{target_role_id}",

                method="POST",

                headers=HEADERS,

                auth=AUTH,

                data=json.dumps(add_payload)

            )

            if not add_response or add_response.status_code not in (200, 201):

                logging.error(f"Failed to add {actor_desc} to target role in project {project_key}")

                continue

            delete_response = request_with_retries(

                f"{CLOUD_BASE_URL}/rest/api/3/project/{project_id}/role/{source_role_id}?{delete_query}",

                method="DELETE",

                headers=HEADERS,

                auth=AUTH

            )

            if not delete_response or delete_response.status_code != 204:

                logging.error(f"Failed to remove {actor_desc} from source role in project {project_key}")

            else:

                logging.info(f"Moved {actor_desc} from '{source_role_name}' to '{target_role_name}' in project {project_key}")

def main():

    print("\nJira Cloud Project Role Cleanup Utility\n")

    source_role_name = input("Enter source role name (example: Administrator (migrated)): ").strip()

    target_role_name = input("Enter target role name (example: Administrator): ").strip()

    selected_projects = prompt_csv_or_all(

        "Enter project keys to move users/groups in, comma-separated, or ALL: "

    )

    dry_run = prompt_yes_no("Dry run? (yes/no): ")

    logging.info(f"Dry run mode: {dry_run}")

    logging.info(f"Source role: {source_role_name}")

    logging.info(f"Target role: {target_role_name}")

    logging.info(f"Selected projects: {selected_projects if selected_projects else 'ALL'}")

    move_project_role_actors(

        source_role_name=source_role_name,

        target_role_name=target_role_name,

        selected_projects=selected_projects,

        dry_run=dry_run

    )

    logging.info("Project role cleanup complete")

if __name__ == "__main__":

    main()

0 comments

Comment

Log in or Sign up to comment
TAGS
AUG Leaders

Atlassian Community Events