Skip to Content
User GuidePython LibraryRecords API (Sync)

Records API

Complete guide to working with records using the sync client library.

Overview

The Records API provides methods to create, read, update, delete, search, and publish records in an NRP repository. Access it via client.records.

from nrp_cmd.sync_client import get_sync_client client = get_sync_client("https://your-repository.org") records_client = client.records

Creating Records

Basic Record Creation

# Create a simple record record = client.records.create({ "metadata": { "title": "My Research Data", "creators": [ { "person_or_org": { "type": "personal", "family_name": "Doe", "given_name": "John" } } ], "resource_type": {"id": "dataset"}, "publication_date": "2024-01-15" } }) print(f"Created record ID: {record.id}") print(f"Record URL: {record.links.self_}")

Create in a Community

# Create record in a specific community record = client.records.create( { "metadata": { "title": "Community Research", "creators": [{"person_or_org": {"name": "Jane Smith"}}], "resource_type": {"id": "publication"} } }, community="my-community-slug" )

Create with Workflow

# Create record with custom workflow record = client.records.create( { "metadata": { "title": "Workflow Test", "creators": [{"person_or_org": {"name": "Test User"}}], "resource_type": {"id": "dataset"} } }, community="my-community", workflow="review" # e.g., requires review before publishing )

Create with Specific Model

# If repository has multiple models datasets_client = client.records.with_model("datasets") record = datasets_client.create({ "metadata": { "title": "Dataset Record", "creators": [{"person_or_org": {"name": "Researcher"}}] } })

Create Without Files

# Create record that won't have files record = client.records.create( { "metadata": { "title": "Metadata Only", "creators": [{"person_or_org": {"name": "Author"}}], "resource_type": {"id": "other"} } }, files_enabled=False )

Create from Existing Data

# Copy metadata from another record existing = client.records.read("abc-123") new_record = client.records.create({ "metadata": existing.metadata })

Reading Records

Read by ID

# Read a draft record by its ID record = client.records.draft_records.read("abc-def-ghi") # Access record data print(f"Title: {record.metadata['title']}") print(f"Created: {record.created}") print(f"Version: {record.revision_id}")

Read by URL

from yarl import URL # Read draft by full URL record_url = URL("https://repository.org/api/records/abc-123") record = client.records.draft_records.read(record_url) # Or read published by URL record = client.records.published_records.read(record_url)

Read Draft vs Published

# Read draft version draft = client.records.draft_records.read("abc-123") # Read published version published = client.records.published_records.read("abc-123")

Read with Model Specification

# Read draft from specific model datasets = client.records.with_model("datasets") record = datasets.draft_records.read("dataset-id") # Read published from specific model record = datasets.published_records.read("dataset-id")

Read with Query Parameters

# Read draft with additional query parameters record = client.records.draft_records.read( "abc-123", query={"expand": "true"} )

Updating Records

Basic Update

# Read draft, modify, and update record = client.records.draft_records.read("abc-123") record.metadata["title"] = "Updated Title" record.metadata["description"] = "New description" updated = client.records.draft_records.update(record) print(f"Updated to revision: {updated.revision_id}")

Update with Version Check

# Update with automatic version checking (default) record = client.records.draft_records.read("abc-123") record.metadata["title"] = "New Title" try: updated = client.records.draft_records.update(record, verify_version=True) except Exception as e: print(f"Update failed - record was modified: {e}")

Update Without Version Check

# Force update without checking version record = client.records.draft_records.read("abc-123") record.metadata["title"] = "Force Updated" updated = client.records.draft_records.update(record, verify_version=False)

Partial Update Pattern

# Read draft, modify specific fields, update record = client.records.draft_records.read("abc-123") # Add a new keyword if "subjects" not in record.metadata: record.metadata["subjects"] = [] record.metadata["subjects"].append({"subject": "climate science"}) # Update additional metadata if "additional_descriptions" not in record.metadata: record.metadata["additional_descriptions"] = [] record.metadata["additional_descriptions"].append({ "description": "This dataset contains...", "type": {"id": "methods"} }) updated = client.records.draft_records.update(record)

Searching Records

# Search for records results = client.records.search(q="climate change") print(f"Total results: {results.hits.total}") for record in results.hits.hits: print(f"- {record.metadata['title']} ({record.id})")
# Search with pagination page1 = client.records.search( q="dataset", page=1, size=10 ) # Get next page page2 = client.records.next_page(record_list=page1) # Get previous page page1_again = client.records.previous_page(record_list=page2)

Search with Sorting

# Sort by newest first results = client.records.search( q="machine learning", sort="newest" ) # Sort by oldest first results = client.records.search( q="machine learning", sort="oldest" ) # Sort by best match (default) results = client.records.search( q="machine learning", sort="bestmatch" )

Search with Facets/Filters

# Search with facets results = client.records.search( q="climate", facets={ "resource_type": "dataset", "access_right": "open" } )

Search in Specific Model

# Search draft datasets datasets = client.records.with_model("datasets") results = datasets.draft_records.search(q="temperature") # Search published datasets results = datasets.published_records.search(q="temperature")

Search Only Drafts or Published

# Search only draft records drafts = client.records.draft_records.search(q="test") # Search only published records published = client.records.published_records.search(q="test")

Empty Search (List All)

# Get all records (paginated) all_records = client.records.search()

Scanning All Records

Basic Scan

# Scan through all matching records with client.records.scan(q="resource_type:dataset") as records: for record in records: print(f"Processing: {record.id}") # Process record...

Scan with Model

# Scan draft records of specific model datasets = client.records.with_model("datasets") with datasets.draft_records.scan() as records: for record in records: print(f"Dataset: {record.metadata['title']}")

Scan Drafts or Published

# Scan all draft records with client.records.draft_records.scan() as records: for record in records: # Process draft... pass # Scan all published records with client.records.published_records.scan() as records: for record in records: # Process published record... pass

Scan with Facets

# Scan draft records with filtering with client.records.draft_records.scan( facets={"resource_type": "dataset", "access_right": "open"} ) as records: count = 0 for record in records: count += 1 print(f"Total open datasets: {count}")

Bulk Processing Pattern

# Process all draft records in batches batch = [] batch_size = 100 with client.records.draft_records.scan(q="needs_processing:true") as records: for record in records: batch.append(record) if len(batch) >= batch_size: # Process batch process_batch(batch) batch = [] # Process remaining records if batch: process_batch(batch)

Deleting Records

Delete by ID

# Delete a draft record by ID client.records.draft_records.delete("abc-123") print("Draft record deleted")

Delete with Record Object

# Delete draft using record object (includes version check) record = client.records.draft_records.read("abc-123") client.records.draft_records.delete(record)

Delete with Version Check

# Delete draft only if version matches client.records.draft_records.delete("abc-123", etag="W/\"12345\"")

Delete Draft vs Published

# Delete draft client.records.draft_records.delete("abc-123") # Delete published requires different approach (retract) record = client.records.published_records.read("abc-123") client.records.retract_published(record)

Publishing Records

Publish Draft

# Create and publish draft = client.records.create({ "metadata": { "title": "Ready to Publish", "creators": [{"person_or_org": {"name": "Author"}}], "resource_type": {"id": "dataset"} } }) # Publish the draft published = client.records.publish(draft) print(f"Published URL: {published.links.self_html}")

Publish Returns Request

ℹ️

If the record requires review or approval, publish() may return a Request object instead of the published record.

from nrp_cmd.types.requests import Request result = client.records.publish(draft) # Check if it's published or requires approval if isinstance(result, Request): # It's a Request print(f"Requires approval. Request ID: {result.id}") print(f"Request status: {result.status}") else: # It's a Record print(f"Published successfully: {result.id}")

Editing Published Records

Edit Metadata

from nrp_cmd.types.requests import Request # Edit a published record published = client.records.published_records.read("abc-123") result = client.records.edit_metadata(published) # Check if editing is allowed or requires approval if isinstance(result, Request): # Returns Request print(f"Edit requires approval: {result.id}") else: # Returns draft Record # Modify the draft result.metadata["title"] = "Updated Title" updated_draft = client.records.draft_records.update(result) # Publish the changes published = client.records.publish(updated_draft)

Creating New Versions

Create New Version

from nrp_cmd.types.requests import Request # Create new version of published record published = client.records.published_records.read("abc-123") result = client.records.new_version(published) # Check result type if isinstance(result, Request): # Request print(f"Version creation requires approval: {result.id}") else: # Draft record of new version # Modify the new version result.metadata["title"] = "Version 2.0" result.metadata["version"] = "2.0" updated = client.records.draft_records.update(result) published_v2 = client.records.publish(updated) print(f"Published new version: {published_v2.id}")

Retracting Published Records

Retract Record

from nrp_cmd.types.requests import Request # Retract a published record published = client.records.published_records.read("abc-123") result = client.records.retract_published(published) # Check if retraction is immediate or requires approval if isinstance(result, Request): # Request print(f"Retraction requires approval: {result.id}") else: # Retracted record print(f"Record retracted: {result.id}")

Complete Workflow Examples

Create, Upload, Publish

# Complete workflow def create_and_publish_dataset(title, data_file): # 1. Create draft draft = client.records.create({ "metadata": { "title": title, "creators": [{"person_or_org": {"name": "Researcher"}}], "resource_type": {"id": "dataset"}, "publication_date": "2024-01-15" } }) # 2. Upload data file file = client.files.upload( draft, key=data_file.name, metadata={"description": "Dataset file"}, source=data_file, progress=f"Uploading {data_file.name}" ) # 3. Publish published = client.records.publish(draft) return published # Use it published = create_and_publish_dataset( "Climate Data 2024", "climate_data.csv" ) print(f"Published at: {published.links.self_html}")

Update Existing Record

def update_record_metadata(record_id, updates): # Read current draft version record = client.records.draft_records.read(record_id) # Apply updates for key, value in updates.items(): record.metadata[key] = value # Save updated = client.records.draft_records.update(record) return updated # Use it updated = update_record_metadata( "abc-123", { "title": "Updated Title", "description": "New description" } )

Bulk Update Pattern

def bulk_update_records(query, field, value): """Update a field across multiple draft records.""" updated_count = 0 with client.records.draft_records.scan(q=query) as records: for record in records: record.metadata[field] = value client.records.draft_records.update(record) updated_count += 1 if updated_count % 10 == 0: print(f"Updated {updated_count} records...") print(f"Total updated: {updated_count}") return updated_count # Use it bulk_update_records( "publisher:old-org", "publisher", "new-org" )

Copy Records Between Repositories

def copy_record_to_repository(source_url, dest_repository): # Get source record (can be draft or published) source_client, record_url = resolve_record_id(source_url) # Try to read as published first, fall back to draft try: source_record = source_client.records.published_records.read(record_url) except: source_record = source_client.records.draft_records.read(record_url) # Get destination client dest_client = get_sync_client(dest_repository) # Create in destination new_record = dest_client.records.create(source_record.metadata) # Copy files if any if source_record.links.files: source_files = source_client.files.list(source_record) for source_file in source_files: # Download from source import tempfile with tempfile.NamedTemporaryFile() as tmp: source_client.files.download( source_file, tmp.name ) # Upload to destination dest_client.files.upload( new_record, key=source_file.key, metadata=source_file.metadata, source=tmp.name ) # Publish if source was published if not hasattr(source_record.links, 'draft'): new_record = dest_client.records.publish(new_record) return new_record

Error Handling

from nrp_cmd.errors import ( RepositoryCommunicationError, RepositoryClientError ) try: record = client.records.create(invalid_data) except RepositoryClientError as e: print(f"Validation error: {e}") # Handle validation errors except RepositoryCommunicationError as e: print(f"Network error: {e}") # Handle network issues

API Reference

Methods

  • create(data, *, model=None, community=None, workflow=None, idempotent=False, files_enabled=True) - Create new record
  • read(record_id, *, model=None, status=None, query=None) - Read a record
  • update(record, *, verify_version=True) - Update a record
  • delete(record_id_or_record, *, etag=None, status=None) - Delete a record
  • search(*, q=None, page=None, size=None, sort=None, model=None, status=None, facets=None) - Search records
  • next_page(*, record_list) - Get next page
  • previous_page(*, record_list) - Get previous page
  • scan(*, q=None, model=None, status=None, facets=None) - Scan all records
  • publish(record) - Publish a draft
  • edit_metadata(record) - Edit published record
  • new_version(record) - Create new version
  • retract_published(record) - Retract published record

Properties

  • with_model(model) - Return client for specific model
  • published_records - Return client for published records only
  • draft_records - Return client for draft records only
Last updated on