Unable to fetch the records from confluent rest proxy using records endpoint.

Rachit Sharma November 4, 2020

I am working on a spring scheduler application. The main function of the application is to pull the records from confluent rest proxy and do some business functionality.

Steps that we are following:

1. Create consumer instance. We are always creating a unique instance when the scheduler runs. (ex. instanceName + service.getCurrentTime()) We have used POST /consumers/(string:group_name)

2. create subscription using the instanceId generated. We have used POST /subscription

3. fetching records(encrypted records). We have used GET /records

This has been observed that when we have pushed few records in topics after the subscription, in that case records are getting pulled when the next scheduler runs, but lets say when we restart kafka (this happens locally as well as on the server) and push records to the topics in that case we need to retry many times 2-3 times atleaset in order to get the records.

I am currently experiencing this issue on server. I have checked with my automation team and found that my consumer group id is valid.  

 

try {
Attributes instanceAttributes = recordsService.createInstance(instanceName + service.getCurrentTime());
// Topics to subscribe
recordsService.createSubscription(instanceAttributes, kafkaTopicProperties.getTopics());
// Fetch records -- Trouble in getting records
ResponseEntity<String> response = recordsService.getRecords(instanceAttributes);

0 answers

Suggest an answer

Log in or Sign up to answer
TAGS
AUG Leaders

Atlassian Community Events