Forums

Articles
Create
cancel
Showing results for 
Search instead for 
Did you mean: 

I'm facing an issue where my Jira Service Account is only able to fetch 15 worklogs per request, des

Palak Bakliwal September 29, 2025

Screenshot 2025-09-29 135053.png

1 answer

0 votes
Tomislav Tobijas
Community Champion
September 29, 2025

Hi @Palak Bakliwal ,

Are you using new Atlassian service accounts, some custom integration/account, or something related to Jira Service Management?

Also, for time tracking, are you using a native time tracker or some Marketplace solutions such as Tempo?

If you can just explain the process here and how you are fetching/syncing data from Jira to your destination, that would be helpful.

Cheers,
Tobi

Palak Bakliwal September 29, 2025
# jira_sync/worklog_sync.py
from pprint import pprint
from datetime import datetime
from typing import Optional, List
from models.worklog import Worklog
from core.logger_config import get_logger
from models.base_model import serialize_mongo_document
import requests
import base64
import os
import json

logger = get_logger(__name__)

def normalize_worklog(jira_worklog, issue_id: str, project_key: str) -> dict:
    """Normalize a Jira worklog into a dict ready for DB insert."""

    # Ensure dict form
    wl = getattr(jira_worklog, "raw", jira_worklog) if hasattr(jira_worklog, "raw") else jira_worklog

    worklog_id = wl.get("id")
    if not worklog_id:
        logger.warning(f"Skipping worklog without id for issue {issue_id}")
        return None

    def parse_date(date_str, field_name):
        if not date_str:
            return None
        for fmt in ("%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z"):
            try:
                return datetime.strptime(date_str, fmt)
            except ValueError:
                continue
        logger.error(f"Failed to parse {field_name}='{date_str}' for worklog {worklog_id}")
        return None

    created_at = parse_date(wl.get("created"), "created")
    updated_at = parse_date(wl.get("updated"), "updated")

    emp_id = wl.get("author", {}).get("accountId")

    normalized = {
        "jira_worklog_id": worklog_id,
        "created_at": created_at,
        "updated_at": updated_at,
        "description": wl.get("comment"),
        "emp_id": emp_id,
        "issue_id": issue_id,
        "jira_project_id": project_key,
        "time_spent_in_hrs": float(wl.get("timeSpentSeconds", 0)) / 3600.0,
    }

    logger.info(f"✅ Normalized worklog {worklog_id} for issue {issue_id}")
    return normalized

def fetch_all_worklogs(jira, issue_id):
    """Handle cases where Jira limits worklog responses"""
    all_worklogs = []
    start_at = 0
    max_results = 50  # Try smaller chunks
   
    while True:
        try:
            response = jira._get_json(
                f"issue/{issue_id}/worklog",
                params={
                    "startAt": start_at,
                    "maxResults": max_results,
                    "expand": "worklogs"  # Sometimes helps
                },
            )
           
            worklogs = response.get("worklogs", [])
            if not worklogs:
                break
               
            all_worklogs.extend(worklogs)
            start_at += len(worklogs)
           
            # Check if we've gotten all worklogs
            if len(worklogs) < max_results:
                break
               
        except Exception as e:
            print(f"Error at startAt {start_at}: {e}")
            break
   
    return all_worklogs

def fetch_worklogs_bulk(cloud_id: str, last_sync: Optional[datetime] = None):
    """Fetch all worklogs in bulk using the working endpoint"""
    all_worklogs_data = []
    start_at = 0
    max_results = 100
   
    # Use simpler JQL query that definitely works
    jql = "worklogDate >= 2024-01-01"
   
    # Get credentials
    email = os.getenv("JIRA_EMAIL")
    api_token = os.getenv("JIRA_API_TOKEN")
   
    if not email or not api_token:
        logger.error("JIRA_EMAIL or JIRA_API_TOKEN not found in environment variables")
        return []
   
    base_url = f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3"
   
    auth = base64.b64encode(f"{email}:{api_token}".encode()).decode()
    headers = {
        "Accept": "application/json",
        "Authorization": f"Basic {auth}"
    }
   
    logger.info(f"Starting bulk worklog fetch with JQL: {jql}")
   
    while True:
        url = f"{base_url}/search/jql"
        params = {
            "jql": jql,
            "expand": "worklog",
            "fields": "worklog,summary,project,key",
            "maxResults": max_results,
            "startAt": start_at
        }
       
        try:
            logger.info(f"Making request to {url} with startAt={start_at}")
            response = requests.get(url, headers=headers, params=params, timeout=60)
           
            logger.info(f"Response status: {response.status_code}")
           
            if response.status_code != 200:
                logger.error(f"API Error: {response.status_code} - {response.text}")
                # Try to continue with next page if it's a temporary error
                if response.status_code == 400:
                    break
                else:
                    continue
           
            data = response.json()
           
            # Debug: log the response structure
            logger.info(f"Response keys: {list(data.keys())}")
            logger.info(f"Total issues in response: {data.get('total', 0)}")
            logger.info(f"StartAt: {data.get('startAt', 0)}, MaxResults: {data.get('maxResults', 0)}")
           
            issues = data.get("issues", [])
            logger.info(f"Number of issues in this batch: {len(issues)}")
           
            if not issues:
                logger.info("No more issues to process")
                break
           
            # Extract worklogs from each issue
            worklogs_count = 0
            for issue in issues:
                issue_key = issue.get('key')
                project_obj = issue.get('fields', {}).get('project', {})
                project_key = project_obj.get('key', 'UNKNOWN')
               
                # Get worklogs - the structure might be different
                worklog_field = issue.get('fields', {}).get('worklog', {})
                worklogs = worklog_field.get('worklogs', [])
               
                logger.info(f"Issue {issue_key} has {len(worklogs)} worklogs")
               
                for wl in worklogs:
                    # Add issue context to worklog
                    wl_data = {
                        'worklog': wl,
                        'issue_key': issue_key,
                        'project_key': project_key
                    }
                    all_worklogs_data.append(wl_data)
                    worklogs_count += 1
           
            logger.info(f"Batch processed: {worklogs_count} worklogs from {len(issues)} issues")
           
            start_at += len(issues)
            logger.info(f"Total worklogs so far: {len(all_worklogs_data)}, Next startAt: {start_at}")
           
            # Check if we've retrieved all issues
            total_issues = data.get("total", 0)
            if start_at >= total_issues:
                logger.info(f"Retrieved all {total_issues} issues")
                break
               
            # Safety break to prevent infinite loops
            if start_at > 10000:
                logger.warning("Safety break triggered at 10000 issues")
                break
               
        except Exception as e:
            logger.error(f"Error fetching worklogs: {str(e)}")
            import traceback
            logger.error(traceback.format_exc())
            break
   
    logger.info(f"Bulk fetch completed: {len(all_worklogs_data)} total worklogs")
    return all_worklogs_data

def sync_worklogs_from_jira(
    jira,
    issue_id: str,
    project_key: str,
    worklog_repository,
    last_sync: Optional[datetime] = None
) -> List[Worklog]:
    """
    Fetch all worklogs for a single Jira issue and upsert into MongoDB.
    Returns a list of Worklog models.
    """

    logger.info(f"Fetching worklogs for issue {issue_id}")
    try:
        worklogs = fetch_all_worklogs(jira, issue_id)

    except Exception as e:
        logger.error(f"Failed to fetch worklogs for issue {issue_id}: {e}")
        return []

    synced_worklogs: List[Worklog] = []

    for w in worklogs:
        try:
            worklog_data = normalize_worklog(w, issue_id, project_key)
            if not worklog_data:
                continue

            worklog_model = Worklog(**worklog_data)

            # Skip unchanged if last_sync is set
            if last_sync:
                created_check = worklog_model.created_at and worklog_model.created_at <= last_sync
                updated_check = worklog_model.updated_at and worklog_model.updated_at <= last_sync
                if created_check and updated_check:
                    logger.debug(f"⏭️ Skipping unchanged worklog {w.id} (created_at={worklog_model.created_at}, updated_at={worklog_model.updated_at}, last_sync={last_sync})")
                    continue

            # Upsert into DB
            worklog_repository.upsert_worklog_by_id(worklog_id=w["id"], worklog=worklog_model)
            synced_worklogs.append(worklog_model)

            logger.info(f"✅ Synced worklog {w['id']} for issue {issue_id}")

        except Exception as e:
            logger.error(f"Failed to process worklog {w.id} for issue {issue_id}: {e}")

    return synced_worklogs

def sync_worklogs_bulk(
    worklog_repository,
    cloud_id: str,
    last_sync: Optional[datetime] = None
) -> List[Worklog]:
    """
    Bulk sync worklogs using the efficient API endpoint
    """
    logger.info("Starting bulk worklog sync...")
   
    try:
        # Fetch all worklogs in bulk
        bulk_worklogs_data = fetch_worklogs_bulk(cloud_id, last_sync)
       
        logger.info(f"Raw worklogs data count: {len(bulk_worklogs_data)}")
       
        synced_worklogs: List[Worklog] = []
       
        for item in bulk_worklogs_data:
            try:
                wl = item['worklog']
                issue_id = item['issue_key']
                project_key = item['project_key']
               
                worklog_data = normalize_worklog(wl, issue_id, project_key)
                if not worklog_data:
                    logger.warning(f"Failed to normalize worklog {wl.get('id')}")
                    continue

                worklog_model = Worklog(**worklog_data)

                # Skip unchanged if last_sync is set
                if last_sync:
                    created_check = worklog_model.created_at and worklog_model.created_at <= last_sync
                    updated_check = worklog_model.updated_at and worklog_model.updated_at <= last_sync
                    if created_check and updated_check:
                        logger.debug(f"⏭️ Skipping unchanged worklog {wl.get('id')}")
                        continue

                # Upsert into DB
                result = worklog_repository.upsert_worklog_by_id(
                    worklog_id=wl.get("id"),
                    worklog=worklog_model
                )
                synced_worklogs.append(worklog_model)

                logger.info(f"✅ Synced worklog {wl.get('id')} for issue {issue_id}")

            except Exception as e:
                logger.error(f"Failed to process worklog {wl.get('id')} for issue {issue_id}: {e}")
                import traceback
                logger.error(traceback.format_exc())
       
        logger.info(f"Bulk sync completed: {len(synced_worklogs)} worklogs synced")
        return synced_worklogs
       
    except Exception as e:
        logger.error(f"Bulk worklog sync failed: {e}")
        import traceback
        logger.error(traceback.format_exc())
        return []
Palak Bakliwal September 29, 2025

i am using the above file to sync scripts

 

Tomislav Tobijas
Community Champion
October 2, 2025

Ah... This is probably way out of my domain, so I won't be much of a help here. Hopefully someone else who's more into scripting will chime in here and help out 👀

I could reach out to my colleagues who usually do scripting and see if they can give a hand, but I can't promise anything.

Suggest an answer

Log in or Sign up to answer
DEPLOYMENT TYPE
CLOUD
PRODUCT PLAN
FREE
TAGS
AUG Leaders

Atlassian Community Events