Skip to Content
User GuidePython LibraryRecords API (Async)

Records API

Complete guide to working with records using the async client library.

Overview

The Records API provides methods to create, read, update, delete, search, and publish records in an NRP repository. Access it via client.records.

from nrp_cmd.async_client import get_async_client client = await get_async_client("https://your-repository.org") records_client = client.records

Creating Records

Basic Record Creation

# Create a simple record record = await client.records.create({ "metadata": { "title": "My Research Data", "creators": [ { "person_or_org": { "type": "personal", "family_name": "Doe", "given_name": "John" } } ], "resource_type": {"id": "dataset"}, "publication_date": "2024-01-15" } }) print(f"Created record ID: {record.id}") print(f"Record URL: {record.links.self_}")

Create in a Community

# Create record in a specific community record = await client.records.create( { "metadata": { "title": "Community Research", "creators": [{"person_or_org": {"name": "Jane Smith"}}], "resource_type": {"id": "publication"} } }, community="my-community-slug" )

Create with Workflow

# Create record with custom workflow record = await client.records.create( { "metadata": { "title": "Workflow Test", "creators": [{"person_or_org": {"name": "Test User"}}], "resource_type": {"id": "dataset"} } }, community="my-community", workflow="review" # e.g., requires review before publishing )

Create with Specific Model

# If repository has multiple models datasets_client = client.records.with_model("datasets") record = await datasets_client.create({ "metadata": { "title": "Dataset Record", "creators": [{"person_or_org": {"name": "Researcher"}}] } })

Create Without Files

# Create record that won't have files record = await client.records.create( { "metadata": { "title": "Metadata Only", "creators": [{"person_or_org": {"name": "Author"}}], "resource_type": {"id": "other"} } }, files_enabled=False )

Create from Existing Data

# Copy metadata from another record existing = await client.records.read("abc-123") new_record = await client.records.create({ "metadata": existing.metadata })

Reading Records

Read by ID

# Read a draft record by its ID record = await client.records.draft_records.read("abc-def-ghi") # Access record data print(f"Title: {record.metadata['title']}") print(f"Created: {record.created}") print(f"Version: {record.revision_id}")

Read by URL

from yarl import URL # Read draft by full URL record_url = URL("https://repository.org/api/records/abc-123") record = await client.records.draft_records.read(record_url) # Or read published by URL record = await client.records.published_records.read(record_url)

Read Draft vs Published

# Read draft version draft = await client.records.draft_records.read("abc-123") # Read published version published = await client.records.published_records.read("abc-123")

Read with Model Specification

# Read draft from specific model datasets = client.records.with_model("datasets") record = await datasets.draft_records.read("dataset-id") # Read published from specific model record = await datasets.published_records.read("dataset-id")

Read with Query Parameters

# Read draft with additional query parameters record = await client.records.draft_records.read( "abc-123", query={"expand": "true"} )

Updating Records

Basic Update

# Read draft, modify, and update record = await client.records.draft_records.read("abc-123") record.metadata["title"] = "Updated Title" record.metadata["description"] = "New description" updated = await client.records.draft_records.update(record) print(f"Updated to revision: {updated.revision_id}")

Update with Version Check

# Update with automatic version checking (default) record = await client.records.draft_records.read("abc-123") record.metadata["title"] = "New Title" try: updated = await client.records.draft_records.update(record, verify_version=True) except Exception as e: print(f"Update failed - record was modified: {e}")

Update Without Version Check

# Force update without checking version record = await client.records.draft_records.read("abc-123") record.metadata["title"] = "Force Updated" updated = await client.records.draft_records.update(record, verify_version=False)

Partial Update Pattern

# Read draft, modify specific fields, update record = await client.records.draft_records.read("abc-123") # Add a new keyword if "subjects" not in record.metadata: record.metadata["subjects"] = [] record.metadata["subjects"].append({"subject": "climate science"}) # Update additional metadata if "additional_descriptions" not in record.metadata: record.metadata["additional_descriptions"] = [] record.metadata["additional_descriptions"].append({ "description": "This dataset contains...", "type": {"id": "methods"} }) updated = await client.records.draft_records.update(record)

Searching Records

# Search for records results = await client.records.search(q="climate change") print(f"Total results: {results.hits.total}") for record in results.hits.hits: print(f"- {record.metadata['title']} ({record.id})")
# Search with pagination page1 = await client.records.search( q="dataset", page=1, size=10 ) # Get next page page2 = await client.records.next_page(record_list=page1) # Get previous page page1_again = await client.records.previous_page(record_list=page2)

Search with Sorting

# Sort by newest first results = await client.records.search( q="machine learning", sort="newest" ) # Sort by oldest first results = await client.records.search( q="machine learning", sort="oldest" ) # Sort by best match (default) results = await client.records.search( q="machine learning", sort="bestmatch" )

Search with Facets/Filters

# Search with facets results = await client.records.search( q="climate", facets={ "resource_type": "dataset", "access_right": "open" } )

Search in Specific Model

# Search draft datasets datasets = client.records.with_model("datasets") results = await datasets.draft_records.search(q="temperature") # Search published datasets results = await datasets.published_records.search(q="temperature")

Search Only Drafts or Published

# Search only draft records drafts = await client.records.draft_records.search(q="test") # Search only published records published = await client.records.published_records.search(q="test")

Empty Search (List All)

# Get all records (paginated) all_records = await client.records.search()

Scanning All Records

Basic Scan

# Scan through all matching records async with client.records.scan(q="resource_type:dataset") as records: async for record in records: print(f"Processing: {record.id}") # Process record...

Scan with Model

# Scan draft records of specific model datasets = client.records.with_model("datasets") async with datasets.draft_records.scan() as records: async for record in records: print(f"Dataset: {record.metadata['title']}")

Scan Drafts or Published

# Scan all draft records async with client.records.draft_records.scan() as records: async for record in records: # Process draft... pass # Scan all published records async with client.records.published_records.scan() as records: async for record in records: # Process published record... pass

Scan with Facets

# Scan draft records with filtering async with client.records.draft_records.scan( facets={"resource_type": "dataset", "access_right": "open"} ) as records: count = 0 async for record in records: count += 1 print(f"Total open datasets: {count}")

Bulk Processing Pattern

# Process all draft records in batches batch = [] batch_size = 100 async with client.records.draft_records.scan(q="needs_processing:true") as records: async for record in records: batch.append(record) if len(batch) >= batch_size: # Process batch await process_batch(batch) batch = [] # Process remaining records if batch: await process_batch(batch)

Deleting Records

Delete by ID

# Delete a draft record by ID await client.records.draft_records.delete("abc-123") print("Draft record deleted")

Delete with Record Object

# Delete draft using record object (includes version check) record = await client.records.draft_records.read("abc-123") await client.records.draft_records.delete(record)

Delete with Version Check

# Delete draft only if version matches await client.records.draft_records.delete("abc-123", etag="W/\"12345\"")

Delete Draft vs Published

# Delete draft await client.records.draft_records.delete("abc-123") # Delete published requires different approach (retract) record = await client.records.published_records.read("abc-123") await client.records.retract_published(record)

Publishing Records

Publish Draft

# Create and publish draft = await client.records.create({ "metadata": { "title": "Ready to Publish", "creators": [{"person_or_org": {"name": "Author"}}], "resource_type": {"id": "dataset"} } }) # Publish the draft published = await client.records.publish(draft) print(f"Published URL: {published.links.self_html}")

Publish Returns Request

ℹ️

If the record requires review or approval, publish() may return a Request object instead of the published record.

from nrp_cmd.types.requests import Request result = await client.records.publish(draft) # Check if it's published or requires approval if isinstance(result, Request): # It's a Request print(f"Requires approval. Request ID: {result.id}") print(f"Request status: {result.status}") else: # It's a Record print(f"Published successfully: {result.id}")

Editing Published Records

Edit Metadata

from nrp_cmd.types.requests import Request # Edit a published record published = await client.records.published_records.read("abc-123") result = await client.records.edit_metadata(published) # Check if editing is allowed or requires approval if isinstance(result, Request): # Returns Request print(f"Edit requires approval: {result.id}") else: # Returns draft Record # Modify the draft result.metadata["title"] = "Updated Title" updated_draft = await client.records.draft_records.update(result) # Publish the changes published = await client.records.publish(updated_draft)

Creating New Versions

Create New Version

from nrp_cmd.types.requests import Request # Create new version of published record published = await client.records.published_records.read("abc-123") result = await client.records.new_version(published) # Check result type if isinstance(result, Request): # Request print(f"Version creation requires approval: {result.id}") else: # Draft record of new version # Modify the new version result.metadata["title"] = "Version 2.0" result.metadata["version"] = "2.0" updated = await client.records.draft_records.update(result) published_v2 = await client.records.publish(updated) print(f"Published new version: {published_v2.id}")

Retracting Published Records

Retract Record

from nrp_cmd.types.requests import Request # Retract a published record published = await client.records.published_records.read("abc-123") result = await client.records.retract_published(published) # Check if retraction is immediate or requires approval if isinstance(result, Request): # Request print(f"Retraction requires approval: {result.id}") else: # Retracted record print(f"Record retracted: {result.id}")

Complete Workflow Examples

Create, Upload, Publish

# Complete workflow async def create_and_publish_dataset(title, data_file): # 1. Create draft draft = await client.records.create({ "metadata": { "title": title, "creators": [{"person_or_org": {"name": "Researcher"}}], "resource_type": {"id": "dataset"}, "publication_date": "2024-01-15" } }) # 2. Upload data file file = await client.files.upload( draft, key=data_file.name, metadata={"description": "Dataset file"}, source=data_file, progress=f"Uploading {data_file.name}" ) # 3. Publish published = await client.records.publish(draft) return published # Use it published = await create_and_publish_dataset( "Climate Data 2024", "climate_data.csv" ) print(f"Published at: {published.links.self_html}")

Update Existing Record

async def update_record_metadata(record_id, updates): # Read current draft version record = await client.records.draft_records.read(record_id) # Apply updates for key, value in updates.items(): record.metadata[key] = value # Save updated = await client.records.draft_records.update(record) return updated # Use it updated = await update_record_metadata( "abc-123", { "title": "Updated Title", "description": "New description" } )

Bulk Update Pattern

async def bulk_update_records(query, field, value): """Update a field across multiple draft records.""" updated_count = 0 async with client.records.draft_records.scan(q=query) as records: async for record in records: record.metadata[field] = value await client.records.draft_records.update(record) updated_count += 1 if updated_count % 10 == 0: print(f"Updated {updated_count} records...") print(f"Total updated: {updated_count}") return updated_count # Use it await bulk_update_records( "publisher:old-org", "publisher", "new-org" )

Copy Records Between Repositories

async def copy_record_to_repository(source_url, dest_repository): # Get source record (can be draft or published) source_client, record_url = await resolve_record_id(source_url) # Try to read as published first, fall back to draft try: source_record = await source_client.records.published_records.read(record_url) except: source_record = await source_client.records.draft_records.read(record_url) # Get destination client dest_client = await get_async_client(dest_repository) # Create in destination new_record = await dest_client.records.create(source_record.metadata) # Copy files if any if source_record.links.files: source_files = await source_client.files.list(source_record) for source_file in source_files: # Download from source import tempfile with tempfile.NamedTemporaryFile() as tmp: await source_client.files.download( source_file, tmp.name ) # Upload to destination await dest_client.files.upload( new_record, key=source_file.key, metadata=source_file.metadata, source=tmp.name ) # Publish if source was published if not hasattr(source_record.links, 'draft'): new_record = await dest_client.records.publish(new_record) return new_record

Error Handling

from nrp_cmd.errors import ( RepositoryCommunicationError, RepositoryClientError ) try: record = await client.records.create(invalid_data) except RepositoryClientError as e: print(f"Validation error: {e}") # Handle validation errors except RepositoryCommunicationError as e: print(f"Network error: {e}") # Handle network issues

API Reference

Methods

  • create(data, *, model=None, community=None, workflow=None, idempotent=False, files_enabled=True) - Create new record
  • read(record_id, *, model=None, status=None, query=None) - Read a record
  • update(record, *, verify_version=True) - Update a record
  • delete(record_id_or_record, *, etag=None, status=None) - Delete a record
  • search(*, q=None, page=None, size=None, sort=None, model=None, status=None, facets=None) - Search records
  • next_page(*, record_list) - Get next page
  • previous_page(*, record_list) - Get previous page
  • scan(*, q=None, model=None, status=None, facets=None) - Scan all records
  • publish(record) - Publish a draft
  • edit_metadata(record) - Edit published record
  • new_version(record) - Create new version
  • retract_published(record) - Retract published record

Properties

  • with_model(model) - Return client for specific model
  • published_records - Return client for published records only
  • draft_records - Return client for draft records only
Last updated on