Files API
Complete guide to working with files using the sync client library.
Overview
The Files API provides methods to upload, download, list, update, and delete files attached to records. Access it via client.files.
from nrp_cmd.sync_client import get_sync_client
client = get_sync_client("https://your-repository.org")
files_client = client.filesListing Files
List Files on a Record
# List all files on a draft record
record = client.records.draft_records.read("abc-123")
files = client.files.list(record)
for file in files:
print(f"File: {file.key}")
print(f" Size: {file.size} bytes")
print(f" Checksum: {file.checksum}")
print(f" Metadata: {file.metadata}")List by Record URL
from yarl import URL
# List files using record URL
record_url = URL("https://repository.org/api/records/abc-123")
files = client.files.list(record_url)Access File Properties
files = client.files.list(record)
for file in files:
# Basic properties
print(f"Key: {file.key}")
print(f"Size: {file.size}")
print(f"MIME type: {file.mimetype}")
print(f"Checksum: {file.checksum}")
# Metadata
print(f"Description: {file.metadata.get('description', 'N/A')}")
# Status
print(f"Status: {file.status}") # e.g., 'completed'
# Links
print(f"Download URL: {file.links.content}")
print(f"Self URL: {file.links.self_}")Uploading Files
Basic Upload from File Path
# Upload a file from filesystem
file = client.files.upload(
record,
key="data.csv",
metadata={"description": "Dataset file"},
source="/path/to/data.csv"
)
print(f"Uploaded: {file.key} ({file.size} bytes)")Upload with Progress Bar
# Upload with progress tracking
file = client.files.upload(
record,
key="large-file.zip",
metadata={"description": "Large dataset archive"},
source="/path/to/large-file.zip",
progress="Uploading large file" # Shows progress bar
)Upload from Path Object
from pathlib import Path
# Upload using Path
data_file = Path("data/experiment_results.csv")
file = client.files.upload(
record,
key=data_file.name,
metadata={"description": "Experiment results"},
source=data_file
)Upload with Data Stream
from nrp_cmd.sync_client.streams import FileSource
# Upload using FileSource for more control
source = FileSource("/path/to/file.pdf")
file = client.files.upload(
record,
key="document.pdf",
metadata={"description": "Research paper"},
source=source
)Upload from Memory
from nrp_cmd.sync_client.streams import MemorySource
# Upload from bytes in memory
data = b"This is my file content"
source = MemorySource(data)
file = client.files.upload(
record,
key="textfile.txt",
metadata={"description": "Text file"},
source=source
)Upload Multiple Files
# Upload multiple files to a record
files_to_upload = [
("data.csv", "/path/to/data.csv", "Raw data"),
("analysis.py", "/path/to/analysis.py", "Analysis script"),
("results.pdf", "/path/to/results.pdf", "Results report"),
]
uploaded_files = []
for key, path, description in files_to_upload:
file = client.files.upload(
record,
key=key,
metadata={"description": description},
source=path,
progress=f"Uploading {key}"
)
uploaded_files.append(file)
print(f"✓ Uploaded {key}")Upload with Transfer Type
ℹ️
Some repositories support different transfer types like URL fetch or multipart upload. Check your repository’s documentation.
# Upload via URL fetch (if supported by repository)
file = client.files.upload(
record,
key="remote-data.csv",
metadata={"description": "Data from URL"},
source="https://example.com/data.csv",
transfer_type="url-fetch"
)Downloading Files
Download to File Path
# Download file to local filesystem
file = client.files.read(record, "data.csv")
client.files.download(
file,
"/path/to/save/data.csv"
)Download with Progress Bar
# Download with progress tracking
client.files.download(
file,
"/path/to/save/large-file.zip",
progress="Downloading large file"
)Download Using File Object
# Get file info first, then download
files = client.files.list(record)
file = files[0] # Get first file
client.files.download(
file,
f"/downloads/{file.key}",
progress=f"Downloading {file.key}"
)Download Using File URL
from yarl import URL
# Download directly from file URL
file_url = URL("https://repository.org/api/records/abc-123/files/data.csv")
client.files.download(
file_url,
"/path/to/save/data.csv"
)Download with DataSink
from nrp_cmd.sync_client.streams import FileSink
# Download using FileSink for more control
sink = FileSink("/path/to/output.pdf")
client.files.download(file, sink)Download to Memory
from nrp_cmd.sync_client.streams import MemorySink
# Download to memory
sink = MemorySink()
client.files.download(file, sink)
# Access the data
data = sink.data
print(f"Downloaded {len(data)} bytes")Download Multiple Files
# Download all files from a record (can be draft or published)
record = client.records.draft_records.read("abc-123")
files = client.files.list(record)
for file in files:
output_path = f"/downloads/{record.id}/{file.key}"
client.files.download(
file,
output_path,
progress=f"Downloading {file.key}"
)
print(f"✓ Downloaded {file.key}")Download Multiple Files Sequentially
# Download multiple files from a record
def download_file(file, output_dir):
output_path = f"{output_dir}/{file.key}"
client.files.download(file, output_path)
return file.key
def download_all_files(record, output_dir):
files = client.files.list(record)
# Download each file
results = []
for file in files:
result = download_file(file, output_dir)
results.append(result)
return results
# Use it
downloaded = download_all_files(record, "/downloads")
print(f"Downloaded {len(downloaded)} files")Reading File Metadata
Read File Info
# Read file metadata without downloading content
file = client.files.read(record, "data.csv")
print(f"File key: {file.key}")
print(f"Size: {file.size}")
print(f"Checksum: {file.checksum}")
print(f"MIME type: {file.mimetype}")
print(f"Metadata: {file.metadata}")Read by File URL
from yarl import URL
# Read file info from URL
file_url = URL("https://repository.org/api/records/abc-123/files/data.csv")
file = client.files.read(file_url)Check File Existence
# Check if file exists on record
try:
file = client.files.read(record, "myfile.pdf")
print(f"File exists: {file.key}")
except Exception:
print("File does not exist")Updating File Metadata
Update File Metadata
# Read file, update metadata, save
file = client.files.read(record, "data.csv")
# Update metadata
file.metadata["description"] = "Updated description"
file.metadata["type"] = "dataset"
file.metadata["processing_date"] = "2024-01-15"
# Save changes
updated_file = client.files.update(file)
print(f"Updated metadata for: {updated_file.key}")Add Metadata Fields
# Add new metadata fields (must be supported by schema on server)
file = client.files.read(record, "image.jpg")
file.metadata["photographer"] = "John Doe"
file.metadata["location"] = "Research Site A"
file.metadata["date_taken"] = "2024-01-15"
file.metadata["camera"] = "Canon EOS R5"
updated = client.files.update(file)Bulk Metadata Update
# Update metadata for multiple files
def update_all_file_metadata(record, new_metadata):
files = client.files.list(record)
for file in files:
# Merge new metadata
file.metadata.update(new_metadata)
client.files.update(file)
print(f"✓ Updated {file.key}")
# Use it
update_all_file_metadata(
record,
{"license": "CC-BY-4.0", "processed": True}
)Deleting Files
Delete by Record and Key
# Delete a file by key
client.files.delete(record, key="unwanted-file.txt")
print("File deleted")Delete Using File Object
# Delete using file object
file = client.files.read(record, "old-data.csv")
client.files.delete(file)Delete by File URL
from yarl import URL
# Delete using file URL
file_url = URL("https://repository.org/api/records/abc-123/files/temp.txt")
client.files.delete(file_url)Delete All Files
# Delete all files from a record
files = client.files.list(record)
for file in files:
client.files.delete(file)
print(f"✓ Deleted {file.key}")Conditional Delete
# Delete files matching condition
files = client.files.list(record)
for file in files:
# Delete only temporary files
if file.key.startswith("temp_"):
client.files.delete(file)
print(f"✓ Deleted temporary file: {file.key}")Complete Workflow Examples
Upload Multiple Files and Publish
def create_record_with_files(title, file_paths):
"""Create record and upload multiple files."""
# Create draft record
draft = client.records.create({
"metadata": {
"title": title,
"creators": [{"person_or_org": {"name": "Researcher"}}],
"resource_type": {"id": "dataset"}
}
})
# Upload each file
for path in file_paths:
filename = Path(path).name
client.files.upload(
draft,
key=filename,
metadata={"description": f"Data file: {filename}"},
source=path,
progress=f"Uploading {filename}"
)
# Publish the record
published = client.records.publish(draft)
return published
# Use it
published = create_record_with_files(
"Research Dataset 2024",
[
"/data/experiment1.csv",
"/data/experiment2.csv",
"/data/README.txt"
]
)Download Complete Record with Files
def download_record_with_files(record_id, output_dir):
"""Download record metadata and all files."""
from pathlib import Path
import json
# Create output directory
output_path = Path(output_dir) / record_id
output_path.mkdir(parents=True, exist_ok=True)
# Get record (can be draft or published)
record = client.records.draft_records.read(record_id)
# Save metadata
metadata_file = output_path / "metadata.json"
with open(metadata_file, 'w') as f:
json.dump(record.metadata, f, indent=2)
# Download files
files = client.files.list(record)
files_dir = output_path / "files"
files_dir.mkdir(exist_ok=True)
for file in files:
file_path = files_dir / file.key
client.files.download(
file,
str(file_path),
progress=f"Downloading {file.key}"
)
# Also save file metadata
file_meta_path = files_dir / f"{file.key}.metadata.json"
with open(file_meta_path, 'w') as f:
json.dump({
"key": file.key,
"size": file.size,
"checksum": file.checksum,
"mimetype": file.mimetype,
"metadata": file.metadata
}, f, indent=2)
return output_path
# Use it
saved_to = download_record_with_files(
"abc-123",
"/backups"
)
print(f"Downloaded to: {saved_to}")Replace File with New Version
def replace_file(record, key, new_file_path):
"""Replace an existing file with a new version."""
# Delete old file
try:
client.files.delete(record, key=key)
except Exception:
pass # File might not exist
# Upload new version
file = client.files.upload(
record,
key=key,
metadata={"description": "Updated version"},
source=new_file_path,
progress=f"Uploading {key}"
)
return file
# Use it
updated = replace_file(
record,
"data.csv",
"/path/to/new_data.csv"
)Sync Files from Directory
def sync_files_to_record(record, directory_path):
"""Upload all files from directory to record."""
from pathlib import Path
directory = Path(directory_path)
uploaded = []
for file_path in directory.iterdir():
if file_path.is_file():
file = client.files.upload(
record,
key=file_path.name,
metadata={
"description": f"Synced from {directory.name}",
"original_path": str(file_path)
},
source=str(file_path),
progress=f"Uploading {file_path.name}"
)
uploaded.append(file)
return uploaded
# Use it
files = sync_files_to_record(
draft_record,
"/data/experiment_2024"
)
print(f"Uploaded {len(files)} files")Copy Files Between Records
def copy_files_between_records(source_record_id, dest_record_id):
"""Copy all files from one record to another."""
import tempfile
from pathlib import Path
# Can copy from/to draft or published records
source_record = client.records.draft_records.read(source_record_id)
dest_record = client.records.draft_records.read(dest_record_id)
source_files = client.files.list(source_record)
# Use temporary directory for intermediate storage
with tempfile.TemporaryDirectory() as tmpdir:
for file in source_files:
# Download from source
tmp_path = Path(tmpdir) / file.key
client.files.download(file, str(tmp_path))
# Upload to destination
client.files.upload(
dest_record,
key=file.key,
metadata=file.metadata.copy(),
source=str(tmp_path),
progress=f"Copying {file.key}"
)
return len(source_files)
# Use it
copied = copy_files_between_records("source-123", "dest-456")
print(f"Copied {copied} files")Validate File Checksums
def validate_file_checksums(record):
"""Download files and verify their checksums."""
import hashlib
import tempfile
from pathlib import Path
files = client.files.list(record)
results = []
with tempfile.TemporaryDirectory() as tmpdir:
for file in files:
# Download file
tmp_path = Path(tmpdir) / file.key
client.files.download(file, str(tmp_path))
# Calculate checksum
hasher = hashlib.md5()
with open(tmp_path, 'rb') as f:
for chunk in iter(lambda: f.read(4096), b''):
hasher.update(chunk)
calculated = f"md5:{hasher.hexdigest()}"
expected = file.checksum
results.append({
"file": file.key,
"valid": calculated == expected,
"expected": expected,
"calculated": calculated
})
return results
# Use it
validation = validate_file_checksums(record)
for result in validation:
status = "✓" if result["valid"] else "✗"
print(f"{status} {result['file']}: {result['valid']}")Error Handling
from nrp_cmd.errors import (
RepositoryCommunicationError,
RepositoryClientError
)
try:
file = client.files.upload(
record,
key="data.csv",
metadata={},
source="/path/to/file.csv"
)
except FileNotFoundError:
print("Source file not found")
except RepositoryClientError as e:
print(f"Upload failed: {e}")
except RepositoryCommunicationError as e:
print(f"Network error: {e}")API Reference
Methods
list(record_or_url)- List all files on a recordread(file_url)/read(record, key)- Get file metadataupload(record_or_url, key, metadata, source, transfer_type='local-file', transfer_metadata=None, progress=None)- Upload a filedownload(file_or_url, sink, *, parts=None, part_size=None, progress=None)- Download a filedownload(record, key, sink, *, parts=None, part_size=None, progress=None)- Download file by keyupdate(file)- Update file metadatadelete(record, key=None)/delete(file)/delete(file_url)- Delete a file
Stream Classes
FileSource(path)- Read from filesystemFileSink(path)- Write to filesystemMemorySource(data)- Read from memoryMemorySink()- Write to memory
Transfer Types
'local-file'- Standard file upload (default)'url-fetch'- Fetch from URL (if supported)'multipart'- Multipart upload (if supported)
Last updated on