Forums

Articles
Create
cancel
Showing results for 
Search instead for 
Did you mean: 
  • Community
  • Q&A
  • Jira
  • Questions
  • I'm facing an issue where my Jira Service Account is only able to fetch 15 worklogs per request, des

I'm facing an issue where my Jira Service Account is only able to fetch 15 worklogs per request, des

Palak Bakliwal September 29, 2025

Screenshot 2025-09-29 135053.png

1 answer

0 votes
Tomislav Tobijas
Community Champion
September 29, 2025

Hi @Palak Bakliwal ,

Are you using new Atlassian service accounts, some custom integration/account, or something related to Jira Service Management?

Also, for time tracking, are you using a native time tracker or some Marketplace solutions such as Tempo?

If you can just explain the process here and how you are fetching/syncing data from Jira to your destination, that would be helpful.

Cheers,
Tobi

Palak Bakliwal September 29, 2025
# jira_sync/worklog_sync.py
from pprint import pprint
from datetime import datetime
from typing import Optional, List
from models.worklog import Worklog
from core.logger_config import get_logger
from models.base_model import serialize_mongo_document
import requests
import base64
import os
import json

logger = get_logger(__name__)

def normalize_worklog(jira_worklog, issue_id: str, project_key: str) -> dict:
    """Normalize a Jira worklog into a dict ready for DB insert."""

    # Ensure dict form
    wl = getattr(jira_worklog, "raw", jira_worklog) if hasattr(jira_worklog, "raw") else jira_worklog

    worklog_id = wl.get("id")
    if not worklog_id:
        logger.warning(f"Skipping worklog without id for issue {issue_id}")
        return None

    def parse_date(date_str, field_name):
        if not date_str:
            return None
        for fmt in ("%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S%z"):
            try:
                return datetime.strptime(date_str, fmt)
            except ValueError:
                continue
        logger.error(f"Failed to parse {field_name}='{date_str}' for worklog {worklog_id}")
        return None

    created_at = parse_date(wl.get("created"), "created")
    updated_at = parse_date(wl.get("updated"), "updated")

    emp_id = wl.get("author", {}).get("accountId")

    normalized = {
        "jira_worklog_id": worklog_id,
        "created_at": created_at,
        "updated_at": updated_at,
        "description": wl.get("comment"),
        "emp_id": emp_id,
        "issue_id": issue_id,
        "jira_project_id": project_key,
        "time_spent_in_hrs": float(wl.get("timeSpentSeconds", 0)) / 3600.0,
    }

    logger.info(f"✅ Normalized worklog {worklog_id} for issue {issue_id}")
    return normalized

def fetch_all_worklogs(jira, issue_id):
    """Handle cases where Jira limits worklog responses"""
    all_worklogs = []
    start_at = 0
    max_results = 50  # Try smaller chunks
   
    while True:
        try:
            response = jira._get_json(
                f"issue/{issue_id}/worklog",
                params={
                    "startAt": start_at,
                    "maxResults": max_results,
                    "expand": "worklogs"  # Sometimes helps
                },
            )
           
            worklogs = response.get("worklogs", [])
            if not worklogs:
                break
               
            all_worklogs.extend(worklogs)
            start_at += len(worklogs)
           
            # Check if we've gotten all worklogs
            if len(worklogs) < max_results:
                break
               
        except Exception as e:
            print(f"Error at startAt {start_at}: {e}")
            break
   
    return all_worklogs

def fetch_worklogs_bulk(cloud_id: str, last_sync: Optional[datetime] = None):
    """Fetch all worklogs in bulk using the working endpoint"""
    all_worklogs_data = []
    start_at = 0
    max_results = 100
   
    # Use simpler JQL query that definitely works
    jql = "worklogDate >= 2024-01-01"
   
    # Get credentials
    email = os.getenv("JIRA_EMAIL")
    api_token = os.getenv("JIRA_API_TOKEN")
   
    if not email or not api_token:
        logger.error("JIRA_EMAIL or JIRA_API_TOKEN not found in environment variables")
        return []
   
    base_url = f"https://api.atlassian.com/ex/jira/{cloud_id}/rest/api/3"
   
    auth = base64.b64encode(f"{email}:{api_token}".encode()).decode()
    headers = {
        "Accept": "application/json",
        "Authorization": f"Basic {auth}"
    }
   
    logger.info(f"Starting bulk worklog fetch with JQL: {jql}")
   
    while True:
        url = f"{base_url}/search/jql"
        params = {
            "jql": jql,
            "expand": "worklog",
            "fields": "worklog,summary,project,key",
            "maxResults": max_results,
            "startAt": start_at
        }
       
        try:
            logger.info(f"Making request to {url} with startAt={start_at}")
            response = requests.get(url, headers=headers, params=params, timeout=60)
           
            logger.info(f"Response status: {response.status_code}")
           
            if response.status_code != 200:
                logger.error(f"API Error: {response.status_code} - {response.text}")
                # Try to continue with next page if it's a temporary error
                if response.status_code == 400:
                    break
                else:
                    continue
           
            data = response.json()
           
            # Debug: log the response structure
            logger.info(f"Response keys: {list(data.keys())}")
            logger.info(f"Total issues in response: {data.get('total', 0)}")
            logger.info(f"StartAt: {data.get('startAt', 0)}, MaxResults: {data.get('maxResults', 0)}")
           
            issues = data.get("issues", [])
            logger.info(f"Number of issues in this batch: {len(issues)}")
           
            if not issues:
                logger.info("No more issues to process")
                break
           
            # Extract worklogs from each issue
            worklogs_count = 0
            for issue in issues:
                issue_key = issue.get('key')
                project_obj = issue.get('fields', {}).get('project', {})
                project_key = project_obj.get('key', 'UNKNOWN')
               
                # Get worklogs - the structure might be different
                worklog_field = issue.get('fields', {}).get('worklog', {})
                worklogs = worklog_field.get('worklogs', [])
               
                logger.info(f"Issue {issue_key} has {len(worklogs)} worklogs")
               
                for wl in worklogs:
                    # Add issue context to worklog
                    wl_data = {
                        'worklog': wl,
                        'issue_key': issue_key,
                        'project_key': project_key
                    }
                    all_worklogs_data.append(wl_data)
                    worklogs_count += 1
           
            logger.info(f"Batch processed: {worklogs_count} worklogs from {len(issues)} issues")
           
            start_at += len(issues)
            logger.info(f"Total worklogs so far: {len(all_worklogs_data)}, Next startAt: {start_at}")
           
            # Check if we've retrieved all issues
            total_issues = data.get("total", 0)
            if start_at >= total_issues:
                logger.info(f"Retrieved all {total_issues} issues")
                break
               
            # Safety break to prevent infinite loops
            if start_at > 10000:
                logger.warning("Safety break triggered at 10000 issues")
                break
               
        except Exception as e:
            logger.error(f"Error fetching worklogs: {str(e)}")
            import traceback
            logger.error(traceback.format_exc())
            break
   
    logger.info(f"Bulk fetch completed: {len(all_worklogs_data)} total worklogs")
    return all_worklogs_data

def sync_worklogs_from_jira(
    jira,
    issue_id: str,
    project_key: str,
    worklog_repository,
    last_sync: Optional[datetime] = None
) -> List[Worklog]:
    """
    Fetch all worklogs for a single Jira issue and upsert into MongoDB.
    Returns a list of Worklog models.
    """

    logger.info(f"Fetching worklogs for issue {issue_id}")
    try:
        worklogs = fetch_all_worklogs(jira, issue_id)

    except Exception as e:
        logger.error(f"Failed to fetch worklogs for issue {issue_id}: {e}")
        return []

    synced_worklogs: List[Worklog] = []

    for w in worklogs:
        try:
            worklog_data = normalize_worklog(w, issue_id, project_key)
            if not worklog_data:
                continue

            worklog_model = Worklog(**worklog_data)

            # Skip unchanged if last_sync is set
            if last_sync:
                created_check = worklog_model.created_at and worklog_model.created_at <= last_sync
                updated_check = worklog_model.updated_at and worklog_model.updated_at <= last_sync
                if created_check and updated_check:
                    logger.debug(f"⏭️ Skipping unchanged worklog {w.id} (created_at={worklog_model.created_at}, updated_at={worklog_model.updated_at}, last_sync={last_sync})")
                    continue

            # Upsert into DB
            worklog_repository.upsert_worklog_by_id(worklog_id=w["id"], worklog=worklog_model)
            synced_worklogs.append(worklog_model)

            logger.info(f"✅ Synced worklog {w['id']} for issue {issue_id}")

        except Exception as e:
            logger.error(f"Failed to process worklog {w.id} for issue {issue_id}: {e}")

    return synced_worklogs

def sync_worklogs_bulk(
    worklog_repository,
    cloud_id: str,
    last_sync: Optional[datetime] = None
) -> List[Worklog]:
    """
    Bulk sync worklogs using the efficient API endpoint
    """
    logger.info("Starting bulk worklog sync...")
   
    try:
        # Fetch all worklogs in bulk
        bulk_worklogs_data = fetch_worklogs_bulk(cloud_id, last_sync)
       
        logger.info(f"Raw worklogs data count: {len(bulk_worklogs_data)}")
       
        synced_worklogs: List[Worklog] = []
       
        for item in bulk_worklogs_data:
            try:
                wl = item['worklog']
                issue_id = item['issue_key']
                project_key = item['project_key']
               
                worklog_data = normalize_worklog(wl, issue_id, project_key)
                if not worklog_data:
                    logger.warning(f"Failed to normalize worklog {wl.get('id')}")
                    continue

                worklog_model = Worklog(**worklog_data)

                # Skip unchanged if last_sync is set
                if last_sync:
                    created_check = worklog_model.created_at and worklog_model.created_at <= last_sync
                    updated_check = worklog_model.updated_at and worklog_model.updated_at <= last_sync
                    if created_check and updated_check:
                        logger.debug(f"⏭️ Skipping unchanged worklog {wl.get('id')}")
                        continue

                # Upsert into DB
                result = worklog_repository.upsert_worklog_by_id(
                    worklog_id=wl.get("id"),
                    worklog=worklog_model
                )
                synced_worklogs.append(worklog_model)

                logger.info(f"✅ Synced worklog {wl.get('id')} for issue {issue_id}")

            except Exception as e:
                logger.error(f"Failed to process worklog {wl.get('id')} for issue {issue_id}: {e}")
                import traceback
                logger.error(traceback.format_exc())
       
        logger.info(f"Bulk sync completed: {len(synced_worklogs)} worklogs synced")
        return synced_worklogs
       
    except Exception as e:
        logger.error(f"Bulk worklog sync failed: {e}")
        import traceback
        logger.error(traceback.format_exc())
        return []
Palak Bakliwal September 29, 2025

i am using the above file to sync scripts

 

Tomislav Tobijas
Community Champion
October 2, 2025

Ah... This is probably way out of my domain, so I won't be much of a help here. Hopefully someone else who's more into scripting will chime in here and help out 👀

I could reach out to my colleagues who usually do scripting and see if they can give a hand, but I can't promise anything.

Suggest an answer

Log in or Sign up to answer
DEPLOYMENT TYPE
CLOUD
PRODUCT PLAN
FREE
TAGS
AUG Leaders

Atlassian Community Events