Files API
Complete guide to working with files using the async client library.
Overview
The Files API provides methods to upload, download, list, update, and delete files attached to records. Access it via client.files.
from nrp_cmd.async_client import get_async_client
client = await get_async_client("https://your-repository.org")
files_client = client.filesListing Files
List Files on a Record
# List all files on a draft record
record = await client.records.draft_records.read("abc-123")
files = await client.files.list(record)
for file in files:
print(f"File: {file.key}")
print(f" Size: {file.size} bytes")
print(f" Checksum: {file.checksum}")
print(f" Metadata: {file.metadata}")List by Record URL
from yarl import URL
# List files using record URL
record_url = URL("https://repository.org/api/records/abc-123")
files = await client.files.list(record_url)Access File Properties
files = await client.files.list(record)
for file in files:
# Basic properties
print(f"Key: {file.key}")
print(f"Size: {file.size}")
print(f"MIME type: {file.mimetype}")
print(f"Checksum: {file.checksum}")
# Metadata
print(f"Description: {file.metadata.get('description', 'N/A')}")
# Status
print(f"Status: {file.status}") # e.g., 'completed'
# Links
print(f"Download URL: {file.links.content}")
print(f"Self URL: {file.links.self_}")Uploading Files
Basic Upload from File Path
# Upload a file from filesystem
file = await client.files.upload(
record,
key="data.csv",
metadata={"description": "Dataset file"},
source="/path/to/data.csv"
)
print(f"Uploaded: {file.key} ({file.size} bytes)")Upload with Progress Bar
# Upload with progress tracking
file = await client.files.upload(
record,
key="large-file.zip",
metadata={"description": "Large dataset archive"},
source="/path/to/large-file.zip",
progress="Uploading large file" # Shows progress bar
)Upload from Path Object
from pathlib import Path
# Upload using Path
data_file = Path("data/experiment_results.csv")
file = await client.files.upload(
record,
key=data_file.name,
metadata={"description": "Experiment results"},
source=data_file
)Upload with Data Stream
from nrp_cmd.async_client.streams import FileSource
# Upload using FileSource for more control
source = FileSource("/path/to/file.pdf")
file = await client.files.upload(
record,
key="document.pdf",
metadata={"description": "Research paper"},
source=source
)Upload from Memory
from nrp_cmd.async_client.streams import MemorySource
# Upload from bytes in memory
data = b"This is my file content"
source = MemorySource(data)
file = await client.files.upload(
record,
key="textfile.txt",
metadata={"description": "Text file"},
source=source
)Upload Multiple Files
# Upload multiple files to a record
files_to_upload = [
("data.csv", "/path/to/data.csv", "Raw data"),
("analysis.py", "/path/to/analysis.py", "Analysis script"),
("results.pdf", "/path/to/results.pdf", "Results report"),
]
uploaded_files = []
for key, path, description in files_to_upload:
file = await client.files.upload(
record,
key=key,
metadata={"description": description},
source=path,
progress=f"Uploading {key}"
)
uploaded_files.append(file)
print(f"✓ Uploaded {key}")Upload with Transfer Type
ℹ️
Some repositories support different transfer types like URL fetch or multipart upload. Check your repository’s documentation.
# Upload via URL fetch (if supported by repository)
file = await client.files.upload(
record,
key="remote-data.csv",
metadata={"description": "Data from URL"},
source="https://example.com/data.csv",
transfer_type="url-fetch"
)Downloading Files
Download to File Path
# Download file to local filesystem
file = await client.files.read(record, "data.csv")
await client.files.download(
file,
"/path/to/save/data.csv"
)Download with Progress Bar
# Download with progress tracking
await client.files.download(
file,
"/path/to/save/large-file.zip",
progress="Downloading large file"
)Download Using File Object
# Get file info first, then download
files = await client.files.list(record)
file = files[0] # Get first file
await client.files.download(
file,
f"/downloads/{file.key}",
progress=f"Downloading {file.key}"
)Download Using File URL
from yarl import URL
# Download directly from file URL
file_url = URL("https://repository.org/api/records/abc-123/files/data.csv")
await client.files.download(
file_url,
"/path/to/save/data.csv"
)Download with DataSink
from nrp_cmd.async_client.streams import FileSink
# Download using FileSink for more control
sink = FileSink("/path/to/output.pdf")
await client.files.download(file, sink)Download to Memory
from nrp_cmd.async_client.streams import MemorySink
# Download to memory
sink = MemorySink()
await client.files.download(file, sink)
# Access the data
data = sink.data
print(f"Downloaded {len(data)} bytes")Download Multiple Files
# Download all files from a record (can be draft or published)
record = await client.records.draft_records.read("abc-123")
files = await client.files.list(record)
for file in files:
output_path = f"/downloads/{record.id}/{file.key}"
await client.files.download(
file,
output_path,
progress=f"Downloading {file.key}"
)
print(f"✓ Downloaded {file.key}")Parallel Download with Connection Limiting
async def download_files_from_draft_in_parallel(
client: AsyncRepositoryClient, files: list[File],
output_dir: str
) -> None:
# Limit to 5 concurrent downloads
with limit_connections(5):
tasks = [
client.files.download(file_, FileSink(f"{output_dir}/{file_.key}"))
for file_ in files
]
results = await asyncio.gather(*tasks)
for result in results:
print(f"✓ Downloaded file to: {result}")Reading File Metadata
Read File Info
# Read file metadata without downloading content
file = await client.files.read(record, "data.csv")
print(f"File key: {file.key}")
print(f"Size: {file.size}")
print(f"Checksum: {file.checksum}")
print(f"MIME type: {file.mimetype}")
print(f"Metadata: {file.metadata}")Read by File URL
from yarl import URL
# Read file info from URL
file_url = URL("https://repository.org/api/records/abc-123/files/data.csv")
file = await client.files.read(file_url)Check File Existence
# Check if file exists on record
try:
file = await client.files.read(record, "myfile.pdf")
print(f"File exists: {file.key}")
except Exception:
print("File does not exist")Updating File Metadata
Update File Metadata
# Read file, update metadata, save
file = await client.files.read(record, "data.csv")
# Update metadata
file.metadata["description"] = "Updated description"
file.metadata["type"] = "dataset"
file.metadata["processing_date"] = "2024-01-15"
# Save changes
updated_file = await client.files.update(file)
print(f"Updated metadata for: {updated_file.key}")Add Metadata Fields
# Add new metadata fields (must be supported by schema on server)
file = await client.files.read(record, "image.jpg")
file.metadata["photographer"] = "John Doe"
file.metadata["location"] = "Research Site A"
file.metadata["date_taken"] = "2024-01-15"
file.metadata["camera"] = "Canon EOS R5"
updated = await client.files.update(file)Bulk Metadata Update
# Update metadata for multiple files
async def update_all_file_metadata(record, new_metadata):
files = await client.files.list(record)
for file in files:
# Merge new metadata
file.metadata.update(new_metadata)
await client.files.update(file)
print(f"✓ Updated {file.key}")
# Use it
await update_all_file_metadata(
record,
{"license": "CC-BY-4.0", "processed": True}
)Deleting Files
Delete by Record and Key
# Delete a file by key
await client.files.delete(record, key="unwanted-file.txt")
print("File deleted")Delete Using File Object
# Delete using file object
file = await client.files.read(record, "old-data.csv")
await client.files.delete(file)Delete by File URL
from yarl import URL
# Delete using file URL
file_url = URL("https://repository.org/api/records/abc-123/files/temp.txt")
await client.files.delete(file_url)Delete All Files
# Delete all files from a record
files = await client.files.list(record)
for file in files:
await client.files.delete(file)
print(f"✓ Deleted {file.key}")Conditional Delete
# Delete files matching condition
files = await client.files.list(record)
for file in files:
# Delete only temporary files
if file.key.startswith("temp_"):
await client.files.delete(file)
print(f"✓ Deleted temporary file: {file.key}")Complete Workflow Examples
Upload Multiple Files and Publish
async def create_record_with_files(title, file_paths):
"""Create record and upload multiple files."""
# Create draft record
draft = await client.records.create({
"metadata": {
"title": title,
"creators": [{"person_or_org": {"name": "Researcher"}}],
"resource_type": {"id": "dataset"}
}
})
# Upload each file
for path in file_paths:
filename = Path(path).name
await client.files.upload(
draft,
key=filename,
metadata={"description": f"Data file: {filename}"},
source=path,
progress=f"Uploading {filename}"
)
# Publish the record
published = await client.records.publish(draft)
return published
# Use it
published = await create_record_with_files(
"Research Dataset 2024",
[
"/data/experiment1.csv",
"/data/experiment2.csv",
"/data/README.txt"
]
)Download Complete Record with Files
async def download_record_with_files(record_id, output_dir):
"""Download record metadata and all files."""
from pathlib import Path
import json
# Create output directory
output_path = Path(output_dir) / record_id
output_path.mkdir(parents=True, exist_ok=True)
# Get record (can be draft or published)
record = await client.records.draft_records.read(record_id)
# Save metadata
metadata_file = output_path / "metadata.json"
with open(metadata_file, 'w') as f:
json.dump(record.metadata, f, indent=2)
# Download files
files = await client.files.list(record)
files_dir = output_path / "files"
files_dir.mkdir(exist_ok=True)
for file in files:
file_path = files_dir / file.key
await client.files.download(
file,
str(file_path),
progress=f"Downloading {file.key}"
)
# Also save file metadata
file_meta_path = files_dir / f"{file.key}.metadata.json"
with open(file_meta_path, 'w') as f:
json.dump({
"key": file.key,
"size": file.size,
"checksum": file.checksum,
"mimetype": file.mimetype,
"metadata": file.metadata
}, f, indent=2)
return output_path
# Use it
saved_to = await download_record_with_files(
"abc-123",
"/backups"
)
print(f"Downloaded to: {saved_to}")Replace File with New Version
async def replace_file(record, key, new_file_path):
"""Replace an existing file with a new version."""
# Delete old file
try:
await client.files.delete(record, key=key)
except Exception:
pass # File might not exist
# Upload new version
file = await client.files.upload(
record,
key=key,
metadata={"description": "Updated version"},
source=new_file_path,
progress=f"Uploading {key}"
)
return file
# Use it
updated = await replace_file(
record,
"data.csv",
"/path/to/new_data.csv"
)Sync Files from Directory
async def sync_files_to_record(record, directory_path):
"""Upload all files from directory to record."""
from pathlib import Path
directory = Path(directory_path)
uploaded = []
for file_path in directory.iterdir():
if file_path.is_file():
file = await client.files.upload(
record,
key=file_path.name,
metadata={
"description": f"Synced from {directory.name}",
"original_path": str(file_path)
},
source=str(file_path),
progress=f"Uploading {file_path.name}"
)
uploaded.append(file)
return uploaded
# Use it
files = await sync_files_to_record(
draft_record,
"/data/experiment_2024"
)
print(f"Uploaded {len(files)} files")Copy Files Between Records
async def copy_files_between_records(source_record_id, dest_record_id):
"""Copy all files from one record to another."""
import tempfile
from pathlib import Path
# Can copy from/to draft or published records
source_record = await client.records.draft_records.read(source_record_id)
dest_record = await client.records.draft_records.read(dest_record_id)
source_files = await client.files.list(source_record)
# Use temporary directory for intermediate storage
with tempfile.TemporaryDirectory() as tmpdir:
for file in source_files:
# Download from source
tmp_path = Path(tmpdir) / file.key
await client.files.download(file, str(tmp_path))
# Upload to destination
await client.files.upload(
dest_record,
key=file.key,
metadata=file.metadata.copy(),
source=str(tmp_path),
progress=f"Copying {file.key}"
)
return len(source_files)
# Use it
copied = await copy_files_between_records("source-123", "dest-456")
print(f"Copied {copied} files")Validate File Checksums
async def validate_file_checksums(record):
"""Download files and verify their checksums."""
import hashlib
import tempfile
from pathlib import Path
files = await client.files.list(record)
results = []
with tempfile.TemporaryDirectory() as tmpdir:
for file in files:
# Download file
tmp_path = Path(tmpdir) / file.key
await client.files.download(file, str(tmp_path))
# Calculate checksum
hasher = hashlib.md5()
with open(tmp_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hasher.update(chunk)
calculated = f"md5:{hasher.hexdigest()}"
expected = file.checksum
results.append({
"file": file.key,
"valid": calculated == expected,
"expected": expected,
"calculated": calculated
})
return results
# Use it
validation = await validate_file_checksums(record)
for result in validation:
status = "✓" if result["valid"] else "✗"
print(f"{status} {result['file']}: {result['valid']}")Error Handling
from nrp_cmd.errors import (
RepositoryCommunicationError,
RepositoryClientError
)
try:
file = await client.files.upload(
record,
key="data.csv",
metadata={},
source="/path/to/file.csv"
)
except FileNotFoundError:
print("Source file not found")
except RepositoryClientError as e:
print(f"Upload failed: {e}")
except RepositoryCommunicationError as e:
print(f"Network error: {e}")API Reference
Methods
list(record_or_url)- List all files on a recordread(file_url)/read(record, key)- Get file metadataupload(record_or_url, key, metadata, source, transfer_type='local-file', transfer_metadata=None, progress=None)- Upload a filedownload(file_or_url, sink, *, parts=None, part_size=None, progress=None)- Download a filedownload(record, key, sink, *, parts=None, part_size=None, progress=None)- Download file by keyupdate(file)- Update file metadatadelete(record, key=None)/delete(file)/delete(file_url)- Delete a file
Stream Classes
FileSource(path)- Read from filesystemFileSink(path)- Write to filesystemMemorySource(data)- Read from memoryMemorySink()- Write to memory
Transfer Types
'local-file'- Standard file upload (default)'url-fetch'- Fetch from URL (if supported)'multipart'- Multipart upload (if supported)
Last updated on