Pagination

Every list method on the Plugipay Python SDK returns a PageResult[T] — a small dataclass with the rows, a cursor, and a has_more flag. Pagination is cursor-based, not offset-based; you walk forward through pages by feeding the previous response's cursor back in.

page = plug.customers.list(limit=20)
for c in page.data:
    print(c["id"], c.get("email"))
if page.has_more:
    next_page = plug.customers.list(limit=20, cursor=page.cursor)

That's the whole API. The rest of this page covers when to use which iteration pattern, how to write a clean generator, and why cursors beat offsets at scale.

The PageResult shape

from plugipay import PageResult, Customer

page: PageResult[Customer] = plug.customers.list(limit=20)

page.data       # list[Customer] — the rows for this page (up to `limit`)
page.cursor     # str | None — opaque token to fetch the next page
page.has_more   # bool — True if at least one more page exists

PageResult is a generic dataclass, so type checkers (mypy, pyright) see page.data as list[Customer], page.data[0]["email"] as Any (the inner is dict-shaped), and so on.

Every list method returns PageResult[T] where T is the appropriate resource: customers.list()PageResult[Customer], invoices.list()PageResult[Invoice], and so on. See the Reference for the full list.

Common patterns

One page only

When you just want "the most recent N":

page = plug.invoices.list(limit=10, status="open")
for invoice in page.data:
    print(invoice["id"], invoice["total"])

limit is capped at 100 server-side (default 20). Don't ask for limit=10_000 thinking you'll get everything — you'll get 100 and lose has_more. Use cursor iteration instead.

Manual cursor loop

The most explicit pattern. Useful when you want to log progress, checkpoint after each batch, or stop mid-way:

cursor = None
total = 0

while True:
    page = plug.customers.list(limit=100, cursor=cursor)
    for customer in page.data:
        process(customer)
        total += 1

    if not page.has_more:
        break
    cursor = page.cursor

print(f"Processed {total} customers")

A few notes:

  • Don't forget cursor = page.cursor between iterations — without it you'll fetch page 1 forever.
  • Check has_more, not page.cursor. On the last page, cursor may still be set (it's the cursor that would fetch the next page if one existed) but has_more is False.
  • Idempotent: you can crash mid-loop, persist cursor, and resume by passing it back in.

Generator helper

If you'd rather iterate items without managing pages:

from typing import Iterator
from plugipay import Customer

def iter_customers(plug, **filters) -> Iterator[Customer]:
    cursor = None
    while True:
        page = plug.customers.list(cursor=cursor, **filters)
        yield from page.data
        if not page.has_more:
            return
        cursor = page.cursor


# Usage
for customer in iter_customers(plug, limit=100, email="ada@example.com"):
    print(customer["id"])

This is the recipe most teams settle on. It composes with itertools (take the first 50 across pages, batch into chunks, etc.):

from itertools import islice

# First 250, regardless of page boundaries
for customer in islice(iter_customers(plug, limit=100), 250):
    process(customer)

If you want one helper that works across resources, write it generically:

def iter_all(list_fn, *, limit=100, **filters):
    cursor = None
    while True:
        page = list_fn(limit=limit, cursor=cursor, **filters)
        yield from page.data
        if not page.has_more:
            return
        cursor = page.cursor

# Use it on any namespace
for invoice in iter_all(plug.invoices.list, status="open"):
    print(invoice["id"])

for refund in iter_all(plug.refunds.list, source_id="cs_…"):
    print(refund["id"])

The SDK doesn't ship a built-in list_all helper today — the generator above is roughly what we'd put in the SDK if it existed, and it's small enough to keep in your codebase.

Filtering & ordering

List methods take resource-specific filters as keyword arguments. The common ones:

# Customers — by exact email
plug.customers.list(email="ada@example.com")

# Invoices — by status and customer
plug.invoices.list(status="open", customer_id="cus_…")

# Checkout sessions — by status
plug.checkout_sessions.list(status="succeeded", customer_id="cus_…")

# Subscriptions — by status, customer, or plan
plug.subscriptions.list(status="active", plan_id="pln_…")

# Refunds — by status and source
plug.refunds.list(status="succeeded", source_id="cs_…")

# Events — by type and time range
plug.events.list(
    type="plugipay.invoice.paid.v1",
    occurred_after="2026-05-01T00:00:00Z",
    occurred_before="2026-05-12T00:00:00Z",
    order="asc",
)

# Ledger — by transaction or source
plug.ledger.list(tx_id="tx_…", order="desc")

The per-method filter surface is documented on the Reference page and in the API docs for the specific resource (e.g. API → Customers).

Why cursors, not offsets?

Offset pagination (?page=5) is unsafe on data that changes — a row inserted between page 3 and page 4 shifts every subsequent page. You'd skip rows or see duplicates.

Cursors point at a stable position (typically the last seen row's id + timestamp). Inserts at the head don't shift the cursor; you'll get them on a re-fetch from the start, but the in-flight cursor pagination stays consistent.

Trade-offs you should know about:

  • No "page 5 of 12" UI. You don't know how many total pages exist until you walk them. If you need that, count separately with a server-side aggregate — or accept that "10,000+" is fine in most UIs.
  • No jumping. You can only walk forward. Jumping to "the page from 30 days ago" requires using a time filter, not a cursor offset.
  • Cursors are opaque. Don't try to decode them. Their format is implementation-defined and may change.

Concurrent pagination

PlugipayClient shares one httpx.Client — the connection pool is fine to use from multiple threads. But don't share a single in-flight cursor walk across threads, or you'll race on the cursor variable.

The clean pattern is "fan out by filter, paginate sequentially within each":

from concurrent.futures import ThreadPoolExecutor

def count_for_status(status):
    return sum(1 for _ in iter_all(plug.invoices.list, status=status))

with ThreadPoolExecutor(max_workers=4) as pool:
    counts = dict(zip(
        ["open", "paid", "void"],
        pool.map(count_for_status, ["open", "paid", "void"]),
    ))

Each worker walks its own filtered list; they share the connection pool; they don't share cursor state.

Performance tips

  • limit=100 is the maximum and usually what you want for bulk reads. It's the cheapest at the per-request level since each request has a fixed overhead.
  • limit=10-20 is fine for UI views where you only render a screenful.
  • Filter server-side — passing customer_id= or status= to the API beats fetching everything and filtering in Python.
  • Use events.list for retroactive event sourcing — it's append-only and naturally indexed by occurredAfter/occurredBefore, so it's the fastest way to replay a window.

Next

Plugipay — Payments that don't tax your success