Skip to Content

NRP Async Client Library intermediate

The NRP Async Client Library provides a Python async/await API for programmatically interacting with NRP repositories. It’s built on top of aiohttp and provides high-level abstractions for working with records, files, and requests.

ℹ️

This library is designed for asynchronous Python applications. If you need a synchronous API, use the sync client library instead. If you’re building command-line tools, consider using the nrp-cmd CLI application.

Key Features

  • Fully async/await: Built with modern Python async patterns
  • Type-safe: Full type annotations for better IDE support
  • Connection pooling: Efficient HTTP connection management
  • Progress tracking: Built-in progress bar support for file operations
  • Retry logic: Automatic retry on transient failures
  • Multi-repository: Work with multiple repositories simultaneously

Prerequisites

  • Python 3.12 or higher
  • Basic understanding of Python async/await patterns
  • Access to an NRP repository (or any Invenio RDM repository)

Installation

pip install nrp-cmd

Quick Start

Basic Example

import asyncio from nrp_cmd.async_client import get_async_client async def main(): # Get a client for a repository client = await get_async_client("https://your-repository.org") # Create a new record record = await client.records.create({ "metadata": { "title": "My First Record", "creators": [{"name": "John Doe"}], "resourceType": {"id": "dataset"} } }) print(f"Created record: {record.id}") # Upload a file file = await client.files.upload( record, key="data.csv", metadata={"description": "My data file"}, source="path/to/data.csv" ) print(f"Uploaded file: {file.key}") # Publish the record published = await client.records.publish(record) print(f"Published record: {published.id}") # Run the async function asyncio.run(main())

Client Architecture

The async client is organized into several components:

Repository Client

The main entry point that provides access to all functionality:

client = await get_async_client(repository)

The client has three main sub-clients:

  • client.records: Operations on records (create, read, update, delete, search, publish)
  • client.files: File operations (upload, download, list, delete)
  • client.requests: Request/workflow operations (submit, accept, decline)

Configuration

The client uses configuration from ~/.nrp/invenio-config.json by default, but you can provide custom configuration:

from nrp_cmd.config import Config, RepositoryConfig from yarl import URL # Create custom configuration config = Config() config.add_repository(RepositoryConfig( alias="my-repo", url=URL("https://your-repository.org/api"), token="your-access-token", verify_tls=True )) # Use custom configuration client = await get_async_client("my-repo", config=config)

Connection Management

The library handles connection pooling automatically. You can control the maximum number of connections:

from nrp_cmd.async_client import limit_connections # Limit to 5 concurrent connections async with limit_connections(5): client = await get_async_client(repository) # Perform operations...

Record Status

Records can be in different states:

from nrp_cmd.async_client import RecordStatus # Work only with draft records draft_client = client.records.draft_records drafts = await draft_client.search(q="title:test") # Work only with published records published_client = client.records.published_records published = await published_client.search(q="title:test")

Error Handling

The library provides specific exception types:

from nrp_cmd.errors import ( RepositoryCommunicationError, RepositoryClientError, StructureError ) try: # Reading a draft record by ID record = await client.records.draft_records.read("non-existent-id") except RepositoryCommunicationError as e: print(f"Network error: {e}") except RepositoryClientError as e: print(f"Client error: {e}")

Progress Tracking

For long-running operations like file uploads/downloads:

# Upload with progress tracking file = await client.files.upload( record, key="large-file.zip", metadata={}, source="path/to/large-file.zip", progress="Uploading large file" # Shows progress bar ) # Download with progress tracking await client.files.download( file, "path/to/save.zip", progress="Downloading file" )

Working with Multiple Repositories

# Connect to multiple repositories repo1 = await get_async_client("https://repo1.org") repo2 = await get_async_client("https://repo2.org") # Search draft records in both results1 = await repo1.records.draft_records.search(q="climate") results2 = await repo2.records.draft_records.search(q="climate") # Copy record from one to another record1 = results1.hits.hits[0] record2 = await repo2.records.create(record1.metadata)

Data Streaming

The library supports streaming data for efficient memory usage:

from nrp_cmd.async_client.streams import FileSource, FileSink # Stream upload from file source = FileSource("large-file.zip") await client.files.upload(record, "file.zip", {}, source=source) # Stream download to file sink = FileSink("downloaded.zip") await client.files.download(file, sink)

Advanced Topics

Scanning All Records

For retrieving all records (not just a page):

# Scan through all draft records matching query async with client.records.draft_records.scan(q="resourceType:dataset") as records: async for record in records: print(f"Processing: {record.id}") # Process each record... # Scan through published records async with client.records.published_records.scan(q="resourceType:dataset") as records: async for record in records: print(f"Processing published: {record.id}") # Process each record...

Working with Models

If your repository has multiple record types (models):

# Work with a specific model dataset_client = client.records.with_model("datasets") dataset = await dataset_client.create({ "metadata": {"title": "Dataset Record"} }) # Search draft records within a specific model results = await dataset_client.draft_records.search(q="climate") # Search published records within a specific model published_results = await dataset_client.published_records.search(q="climate")

Idempotent Operations

For operations that can be safely retried:

# Create with idempotent flag if your PID is deterministic record = await client.records.create( data={"metadata": {...}, "id": "my-fixed-id"}, idempotent=True )

Next Steps

API Reference

Main Functions

  • get_async_client(repository, refresh=False, config=None) - Get a client for a repository
  • resolve_record_id(url, config=None, refresh=False) - Resolve a record URL to a client and normalized URL
  • limit_connections(max_connections) - Context manager to limit concurrent connections

Client Properties

  • client.records - AsyncRecordsClient instance
  • client.files - AsyncFilesClient instance
  • client.requests - AsyncRequestsClient instance
  • client.config - Repository configuration
  • client.info - Repository information

Types

  • Record - Record data structure
  • RecordList - List of records with pagination
  • File - File metadata and links
  • Request - Request/workflow data
  • RecordStatus - Enum: ALL, PUBLISHED, DRAFT
Last updated on