From a0b59c29b3d2235343caba5ef6827bdc5aea0e28 Mon Sep 17 00:00:00 2001 From: Asish Kumar Date: Mon, 25 May 2026 08:47:03 +0530 Subject: [PATCH] Stop HTTP cursor pagination when cursor repeats --- .../http/source/HttpSourceReader.java | 12 ++++- .../HttpSourceReaderInternalPollNextTest.java | 45 +++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index c7ab3c9c00bc..216d96bc9f1c 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -404,9 +404,17 @@ private void collect(Collector output, String data) throws IOExcep if (cursorList != null && !cursorList.isEmpty()) { newCursor = cursorList.get(0); } + boolean cursorNotAdvanced = + !Strings.isNullOrEmpty(newCursor) + && Objects.equals(pageInfo.getCursor(), newCursor); pageInfo.setCursor(newCursor); - // if not present cursor, then no more data - noMoreElementFlag = Strings.isNullOrEmpty(newCursor); + // if not present cursor or cursor not advanced, then no more data + noMoreElementFlag = Strings.isNullOrEmpty(newCursor) || cursorNotAdvanced; + if (cursorNotAdvanced) { + log.warn( + "HTTP cursor pagination stopped because cursor [{}] did not advance", + newCursor); + } } else { // if not set page pagination is default // Determine whether the task is completed by specifying the presence of the 'total diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/HttpSourceReaderInternalPollNextTest.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/HttpSourceReaderInternalPollNextTest.java index 057eeb851b44..b9a18dced359 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/HttpSourceReaderInternalPollNextTest.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/HttpSourceReaderInternalPollNextTest.java @@ -16,6 +16,7 @@ */ package org.apache.seatunnel.connectors.seatunnel.http; +import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -37,15 +38,19 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class HttpSourceReaderInternalPollNextTest { @@ -135,6 +140,46 @@ public void testPageNumberPlaceHolderRequestBodyUpdate() throws Exception { httpSourceReader.close(); } + @Test + @Timeout(5) + public void testCursorPaginationStopsWhenCursorDoesNotAdvance() throws Exception { + AtomicInteger requestCount = new AtomicInteger(); + when(context.getBoundedness()).thenReturn(Boundedness.BOUNDED); + when(httpClientProvider.execute( + anyString(), anyString(), any(), any(), any(), anyBoolean())) + .thenAnswer( + invocation -> { + if (requestCount.incrementAndGet() == 1) { + when(httpResponse.getContent()) + .thenReturn( + "{\"data\":[{\"key1\":\"v1\",\"key2\":\"v2\"}]," + + "\"next\":\"cursor-1\"}"); + } else { + when(httpResponse.getContent()) + .thenReturn("{\"data\":[],\"next\":\"cursor-1\"}"); + } + when(httpResponse.getCode()).thenReturn(200); + return httpResponse; + }); + + PageInfo pageInfo = new PageInfo(); + pageInfo.setPageType(HttpPaginationType.CURSOR.getCode()); + pageInfo.setPageCursorResponseField("$.next"); + + httpSourceReader = + new HttpSourceReader( + httpParameter, context, deserializationSchema, null, "$.data", pageInfo); + httpSourceReader.open(); + httpSourceReader.setHttpClient(httpClientProvider); + httpSourceReader.internalPollNext(collector); + + Assertions.assertEquals(2, requestCount.get()); + verify(context).signalNoMoreElement(); + verify(httpClientProvider, times(2)) + .execute(anyString(), anyString(), any(), any(), any(), anyBoolean()); + httpSourceReader.close(); + } + @AfterEach public void tearDown() throws Exception { mock.close();