Records API
Complete guide to working with records using the async client library.
Overview
The Records API provides methods to create, read, update, delete, search, and publish records in an NRP repository. Access it via client.records.
from nrp_cmd.async_client import get_async_client
client = await get_async_client("https://your-repository.org")
records_client = client.recordsCreating Records
Basic Record Creation
# Create a simple record
record = await client.records.create({
"metadata": {
"title": "My Research Data",
"creators": [
{
"person_or_org": {
"type": "personal",
"family_name": "Doe",
"given_name": "John"
}
}
],
"resource_type": {"id": "dataset"},
"publication_date": "2024-01-15"
}
})
print(f"Created record ID: {record.id}")
print(f"Record URL: {record.links.self_}")Create in a Community
# Create record in a specific community
record = await client.records.create(
{
"metadata": {
"title": "Community Research",
"creators": [{"person_or_org": {"name": "Jane Smith"}}],
"resource_type": {"id": "publication"}
}
},
community="my-community-slug"
)Create with Workflow
# Create record with custom workflow
record = await client.records.create(
{
"metadata": {
"title": "Workflow Test",
"creators": [{"person_or_org": {"name": "Test User"}}],
"resource_type": {"id": "dataset"}
}
},
community="my-community",
workflow="review" # e.g., requires review before publishing
)Create with Specific Model
# If repository has multiple models
datasets_client = client.records.with_model("datasets")
record = await datasets_client.create({
"metadata": {
"title": "Dataset Record",
"creators": [{"person_or_org": {"name": "Researcher"}}]
}
})Create Without Files
# Create record that won't have files
record = await client.records.create(
{
"metadata": {
"title": "Metadata Only",
"creators": [{"person_or_org": {"name": "Author"}}],
"resource_type": {"id": "other"}
}
},
files_enabled=False
)Create from Existing Data
# Copy metadata from another record
existing = await client.records.read("abc-123")
new_record = await client.records.create({
"metadata": existing.metadata
})Reading Records
Read by ID
# Read a draft record by its ID
record = await client.records.draft_records.read("abc-def-ghi")
# Access record data
print(f"Title: {record.metadata['title']}")
print(f"Created: {record.created}")
print(f"Version: {record.revision_id}")Read by URL
from yarl import URL
# Read draft by full URL
record_url = URL("https://repository.org/api/records/abc-123")
record = await client.records.draft_records.read(record_url)
# Or read published by URL
record = await client.records.published_records.read(record_url)Read Draft vs Published
# Read draft version
draft = await client.records.draft_records.read("abc-123")
# Read published version
published = await client.records.published_records.read("abc-123")Read with Model Specification
# Read draft from specific model
datasets = client.records.with_model("datasets")
record = await datasets.draft_records.read("dataset-id")
# Read published from specific model
record = await datasets.published_records.read("dataset-id")Read with Query Parameters
# Read draft with additional query parameters
record = await client.records.draft_records.read(
"abc-123",
query={"expand": "true"}
)Updating Records
Basic Update
# Read draft, modify, and update
record = await client.records.draft_records.read("abc-123")
record.metadata["title"] = "Updated Title"
record.metadata["description"] = "New description"
updated = await client.records.draft_records.update(record)
print(f"Updated to revision: {updated.revision_id}")Update with Version Check
# Update with automatic version checking (default)
record = await client.records.draft_records.read("abc-123")
record.metadata["title"] = "New Title"
try:
updated = await client.records.draft_records.update(record, verify_version=True)
except Exception as e:
print(f"Update failed - record was modified: {e}")Update Without Version Check
# Force update without checking version
record = await client.records.draft_records.read("abc-123")
record.metadata["title"] = "Force Updated"
updated = await client.records.draft_records.update(record, verify_version=False)Partial Update Pattern
# Read draft, modify specific fields, update
record = await client.records.draft_records.read("abc-123")
# Add a new keyword
if "subjects" not in record.metadata:
record.metadata["subjects"] = []
record.metadata["subjects"].append({"subject": "climate science"})
# Update additional metadata
if "additional_descriptions" not in record.metadata:
record.metadata["additional_descriptions"] = []
record.metadata["additional_descriptions"].append({
"description": "This dataset contains...",
"type": {"id": "methods"}
})
updated = await client.records.draft_records.update(record)Searching Records
Basic Search
# Search for records
results = await client.records.search(q="climate change")
print(f"Total results: {results.hits.total}")
for record in results.hits.hits:
print(f"- {record.metadata['title']} ({record.id})")Paginated Search
# Search with pagination
page1 = await client.records.search(
q="dataset",
page=1,
size=10
)
# Get next page
page2 = await client.records.next_page(record_list=page1)
# Get previous page
page1_again = await client.records.previous_page(record_list=page2)Search with Sorting
# Sort by newest first
results = await client.records.search(
q="machine learning",
sort="newest"
)
# Sort by oldest first
results = await client.records.search(
q="machine learning",
sort="oldest"
)
# Sort by best match (default)
results = await client.records.search(
q="machine learning",
sort="bestmatch"
)Search with Facets/Filters
# Search with facets
results = await client.records.search(
q="climate",
facets={
"resource_type": "dataset",
"access_right": "open"
}
)Search in Specific Model
# Search draft datasets
datasets = client.records.with_model("datasets")
results = await datasets.draft_records.search(q="temperature")
# Search published datasets
results = await datasets.published_records.search(q="temperature")Search Only Drafts or Published
# Search only draft records
drafts = await client.records.draft_records.search(q="test")
# Search only published records
published = await client.records.published_records.search(q="test")Empty Search (List All)
# Get all records (paginated)
all_records = await client.records.search()Scanning All Records
Basic Scan
# Scan through all matching records
async with client.records.scan(q="resource_type:dataset") as records:
async for record in records:
print(f"Processing: {record.id}")
# Process record...Scan with Model
# Scan draft records of specific model
datasets = client.records.with_model("datasets")
async with datasets.draft_records.scan() as records:
async for record in records:
print(f"Dataset: {record.metadata['title']}")Scan Drafts or Published
# Scan all draft records
async with client.records.draft_records.scan() as records:
async for record in records:
# Process draft...
pass
# Scan all published records
async with client.records.published_records.scan() as records:
async for record in records:
# Process published record...
passScan with Facets
# Scan draft records with filtering
async with client.records.draft_records.scan(
facets={"resource_type": "dataset", "access_right": "open"}
) as records:
count = 0
async for record in records:
count += 1
print(f"Total open datasets: {count}")Bulk Processing Pattern
# Process all draft records in batches
batch = []
batch_size = 100
async with client.records.draft_records.scan(q="needs_processing:true") as records:
async for record in records:
batch.append(record)
if len(batch) >= batch_size:
# Process batch
await process_batch(batch)
batch = []
# Process remaining records
if batch:
await process_batch(batch)Deleting Records
Delete by ID
# Delete a draft record by ID
await client.records.draft_records.delete("abc-123")
print("Draft record deleted")Delete with Record Object
# Delete draft using record object (includes version check)
record = await client.records.draft_records.read("abc-123")
await client.records.draft_records.delete(record)Delete with Version Check
# Delete draft only if version matches
await client.records.draft_records.delete("abc-123", etag="W/\"12345\"")Delete Draft vs Published
# Delete draft
await client.records.draft_records.delete("abc-123")
# Delete published requires different approach (retract)
record = await client.records.published_records.read("abc-123")
await client.records.retract_published(record)Publishing Records
Publish Draft
# Create and publish
draft = await client.records.create({
"metadata": {
"title": "Ready to Publish",
"creators": [{"person_or_org": {"name": "Author"}}],
"resource_type": {"id": "dataset"}
}
})
# Publish the draft
published = await client.records.publish(draft)
print(f"Published URL: {published.links.self_html}")Publish Returns Request
ℹ️
If the record requires review or approval, publish() may return a Request object
instead of the published record.
from nrp_cmd.types.requests import Request
result = await client.records.publish(draft)
# Check if it's published or requires approval
if isinstance(result, Request): # It's a Request
print(f"Requires approval. Request ID: {result.id}")
print(f"Request status: {result.status}")
else: # It's a Record
print(f"Published successfully: {result.id}")Editing Published Records
Edit Metadata
from nrp_cmd.types.requests import Request
# Edit a published record
published = await client.records.published_records.read("abc-123")
result = await client.records.edit_metadata(published)
# Check if editing is allowed or requires approval
if isinstance(result, Request): # Returns Request
print(f"Edit requires approval: {result.id}")
else: # Returns draft Record
# Modify the draft
result.metadata["title"] = "Updated Title"
updated_draft = await client.records.draft_records.update(result)
# Publish the changes
published = await client.records.publish(updated_draft)Creating New Versions
Create New Version
from nrp_cmd.types.requests import Request
# Create new version of published record
published = await client.records.published_records.read("abc-123")
result = await client.records.new_version(published)
# Check result type
if isinstance(result, Request): # Request
print(f"Version creation requires approval: {result.id}")
else: # Draft record of new version
# Modify the new version
result.metadata["title"] = "Version 2.0"
result.metadata["version"] = "2.0"
updated = await client.records.draft_records.update(result)
published_v2 = await client.records.publish(updated)
print(f"Published new version: {published_v2.id}")Retracting Published Records
Retract Record
from nrp_cmd.types.requests import Request
# Retract a published record
published = await client.records.published_records.read("abc-123")
result = await client.records.retract_published(published)
# Check if retraction is immediate or requires approval
if isinstance(result, Request): # Request
print(f"Retraction requires approval: {result.id}")
else: # Retracted record
print(f"Record retracted: {result.id}")Complete Workflow Examples
Create, Upload, Publish
# Complete workflow
async def create_and_publish_dataset(title, data_file):
# 1. Create draft
draft = await client.records.create({
"metadata": {
"title": title,
"creators": [{"person_or_org": {"name": "Researcher"}}],
"resource_type": {"id": "dataset"},
"publication_date": "2024-01-15"
}
})
# 2. Upload data file
file = await client.files.upload(
draft,
key=data_file.name,
metadata={"description": "Dataset file"},
source=data_file,
progress=f"Uploading {data_file.name}"
)
# 3. Publish
published = await client.records.publish(draft)
return published
# Use it
published = await create_and_publish_dataset(
"Climate Data 2024",
"climate_data.csv"
)
print(f"Published at: {published.links.self_html}")Update Existing Record
async def update_record_metadata(record_id, updates):
# Read current draft version
record = await client.records.draft_records.read(record_id)
# Apply updates
for key, value in updates.items():
record.metadata[key] = value
# Save
updated = await client.records.draft_records.update(record)
return updated
# Use it
updated = await update_record_metadata(
"abc-123",
{
"title": "Updated Title",
"description": "New description"
}
)Bulk Update Pattern
async def bulk_update_records(query, field, value):
"""Update a field across multiple draft records."""
updated_count = 0
async with client.records.draft_records.scan(q=query) as records:
async for record in records:
record.metadata[field] = value
await client.records.draft_records.update(record)
updated_count += 1
if updated_count % 10 == 0:
print(f"Updated {updated_count} records...")
print(f"Total updated: {updated_count}")
return updated_count
# Use it
await bulk_update_records(
"publisher:old-org",
"publisher",
"new-org"
)Copy Records Between Repositories
async def copy_record_to_repository(source_url, dest_repository):
# Get source record (can be draft or published)
source_client, record_url = await resolve_record_id(source_url)
# Try to read as published first, fall back to draft
try:
source_record = await source_client.records.published_records.read(record_url)
except:
source_record = await source_client.records.draft_records.read(record_url)
# Get destination client
dest_client = await get_async_client(dest_repository)
# Create in destination
new_record = await dest_client.records.create(source_record.metadata)
# Copy files if any
if source_record.links.files:
source_files = await source_client.files.list(source_record)
for source_file in source_files:
# Download from source
import tempfile
with tempfile.NamedTemporaryFile() as tmp:
await source_client.files.download(
source_file,
tmp.name
)
# Upload to destination
await dest_client.files.upload(
new_record,
key=source_file.key,
metadata=source_file.metadata,
source=tmp.name
)
# Publish if source was published
if not hasattr(source_record.links, 'draft'):
new_record = await dest_client.records.publish(new_record)
return new_recordError Handling
from nrp_cmd.errors import (
RepositoryCommunicationError,
RepositoryClientError
)
try:
record = await client.records.create(invalid_data)
except RepositoryClientError as e:
print(f"Validation error: {e}")
# Handle validation errors
except RepositoryCommunicationError as e:
print(f"Network error: {e}")
# Handle network issuesAPI Reference
Methods
create(data, *, model=None, community=None, workflow=None, idempotent=False, files_enabled=True)- Create new recordread(record_id, *, model=None, status=None, query=None)- Read a recordupdate(record, *, verify_version=True)- Update a recorddelete(record_id_or_record, *, etag=None, status=None)- Delete a recordsearch(*, q=None, page=None, size=None, sort=None, model=None, status=None, facets=None)- Search recordsnext_page(*, record_list)- Get next pageprevious_page(*, record_list)- Get previous pagescan(*, q=None, model=None, status=None, facets=None)- Scan all recordspublish(record)- Publish a draftedit_metadata(record)- Edit published recordnew_version(record)- Create new versionretract_published(record)- Retract published record
Properties
with_model(model)- Return client for specific modelpublished_records- Return client for published records onlydraft_records- Return client for draft records only
Last updated on