Skip to Content
User GuidePython LibraryFiles API (Async)

Files API

Complete guide to working with files using the async client library.

Overview

The Files API provides methods to upload, download, list, update, and delete files attached to records. Access it via client.files.

from nrp_cmd.async_client import get_async_client client = await get_async_client("https://your-repository.org") files_client = client.files

Listing Files

List Files on a Record

# List all files on a draft record record = await client.records.draft_records.read("abc-123") files = await client.files.list(record) for file in files: print(f"File: {file.key}") print(f" Size: {file.size} bytes") print(f" Checksum: {file.checksum}") print(f" Metadata: {file.metadata}")

List by Record URL

from yarl import URL # List files using record URL record_url = URL("https://repository.org/api/records/abc-123") files = await client.files.list(record_url)

Access File Properties

files = await client.files.list(record) for file in files: # Basic properties print(f"Key: {file.key}") print(f"Size: {file.size}") print(f"MIME type: {file.mimetype}") print(f"Checksum: {file.checksum}") # Metadata print(f"Description: {file.metadata.get('description', 'N/A')}") # Status print(f"Status: {file.status}") # e.g., 'completed' # Links print(f"Download URL: {file.links.content}") print(f"Self URL: {file.links.self_}")

Uploading Files

Basic Upload from File Path

# Upload a file from filesystem file = await client.files.upload( record, key="data.csv", metadata={"description": "Dataset file"}, source="/path/to/data.csv" ) print(f"Uploaded: {file.key} ({file.size} bytes)")

Upload with Progress Bar

# Upload with progress tracking file = await client.files.upload( record, key="large-file.zip", metadata={"description": "Large dataset archive"}, source="/path/to/large-file.zip", progress="Uploading large file" # Shows progress bar )

Upload from Path Object

from pathlib import Path # Upload using Path data_file = Path("data/experiment_results.csv") file = await client.files.upload( record, key=data_file.name, metadata={"description": "Experiment results"}, source=data_file )

Upload with Data Stream

from nrp_cmd.async_client.streams import FileSource # Upload using FileSource for more control source = FileSource("/path/to/file.pdf") file = await client.files.upload( record, key="document.pdf", metadata={"description": "Research paper"}, source=source )

Upload from Memory

from nrp_cmd.async_client.streams import MemorySource # Upload from bytes in memory data = b"This is my file content" source = MemorySource(data) file = await client.files.upload( record, key="textfile.txt", metadata={"description": "Text file"}, source=source )

Upload Multiple Files

# Upload multiple files to a record files_to_upload = [ ("data.csv", "/path/to/data.csv", "Raw data"), ("analysis.py", "/path/to/analysis.py", "Analysis script"), ("results.pdf", "/path/to/results.pdf", "Results report"), ] uploaded_files = [] for key, path, description in files_to_upload: file = await client.files.upload( record, key=key, metadata={"description": description}, source=path, progress=f"Uploading {key}" ) uploaded_files.append(file) print(f"✓ Uploaded {key}")

Upload with Transfer Type

ℹ️

Some repositories support different transfer types like URL fetch or multipart upload. Check your repository’s documentation.

# Upload via URL fetch (if supported by repository) file = await client.files.upload( record, key="remote-data.csv", metadata={"description": "Data from URL"}, source="https://example.com/data.csv", transfer_type="url-fetch" )

Downloading Files

Download to File Path

# Download file to local filesystem file = await client.files.read(record, "data.csv") await client.files.download( file, "/path/to/save/data.csv" )

Download with Progress Bar

# Download with progress tracking await client.files.download( file, "/path/to/save/large-file.zip", progress="Downloading large file" )

Download Using File Object

# Get file info first, then download files = await client.files.list(record) file = files[0] # Get first file await client.files.download( file, f"/downloads/{file.key}", progress=f"Downloading {file.key}" )

Download Using File URL

from yarl import URL # Download directly from file URL file_url = URL("https://repository.org/api/records/abc-123/files/data.csv") await client.files.download( file_url, "/path/to/save/data.csv" )

Download with DataSink

from nrp_cmd.async_client.streams import FileSink # Download using FileSink for more control sink = FileSink("/path/to/output.pdf") await client.files.download(file, sink)

Download to Memory

from nrp_cmd.async_client.streams import MemorySink # Download to memory sink = MemorySink() await client.files.download(file, sink) # Access the data data = sink.data print(f"Downloaded {len(data)} bytes")

Download Multiple Files

# Download all files from a record (can be draft or published) record = await client.records.draft_records.read("abc-123") files = await client.files.list(record) for file in files: output_path = f"/downloads/{record.id}/{file.key}" await client.files.download( file, output_path, progress=f"Downloading {file.key}" ) print(f"✓ Downloaded {file.key}")

Parallel Download with Connection Limiting

async def download_files_from_draft_in_parallel( client: AsyncRepositoryClient, files: list[File], output_dir: str ) -> None: # Limit to 5 concurrent downloads with limit_connections(5): tasks = [ client.files.download(file_, FileSink(f"{output_dir}/{file_.key}")) for file_ in files ] results = await asyncio.gather(*tasks) for result in results: print(f"✓ Downloaded file to: {result}")

Reading File Metadata

Read File Info

# Read file metadata without downloading content file = await client.files.read(record, "data.csv") print(f"File key: {file.key}") print(f"Size: {file.size}") print(f"Checksum: {file.checksum}") print(f"MIME type: {file.mimetype}") print(f"Metadata: {file.metadata}")

Read by File URL

from yarl import URL # Read file info from URL file_url = URL("https://repository.org/api/records/abc-123/files/data.csv") file = await client.files.read(file_url)

Check File Existence

# Check if file exists on record try: file = await client.files.read(record, "myfile.pdf") print(f"File exists: {file.key}") except Exception: print("File does not exist")

Updating File Metadata

Update File Metadata

# Read file, update metadata, save file = await client.files.read(record, "data.csv") # Update metadata file.metadata["description"] = "Updated description" file.metadata["type"] = "dataset" file.metadata["processing_date"] = "2024-01-15" # Save changes updated_file = await client.files.update(file) print(f"Updated metadata for: {updated_file.key}")

Add Metadata Fields

# Add new metadata fields (must be supported by schema on server) file = await client.files.read(record, "image.jpg") file.metadata["photographer"] = "John Doe" file.metadata["location"] = "Research Site A" file.metadata["date_taken"] = "2024-01-15" file.metadata["camera"] = "Canon EOS R5" updated = await client.files.update(file)

Bulk Metadata Update

# Update metadata for multiple files async def update_all_file_metadata(record, new_metadata): files = await client.files.list(record) for file in files: # Merge new metadata file.metadata.update(new_metadata) await client.files.update(file) print(f"✓ Updated {file.key}") # Use it await update_all_file_metadata( record, {"license": "CC-BY-4.0", "processed": True} )

Deleting Files

Delete by Record and Key

# Delete a file by key await client.files.delete(record, key="unwanted-file.txt") print("File deleted")

Delete Using File Object

# Delete using file object file = await client.files.read(record, "old-data.csv") await client.files.delete(file)

Delete by File URL

from yarl import URL # Delete using file URL file_url = URL("https://repository.org/api/records/abc-123/files/temp.txt") await client.files.delete(file_url)

Delete All Files

# Delete all files from a record files = await client.files.list(record) for file in files: await client.files.delete(file) print(f"✓ Deleted {file.key}")

Conditional Delete

# Delete files matching condition files = await client.files.list(record) for file in files: # Delete only temporary files if file.key.startswith("temp_"): await client.files.delete(file) print(f"✓ Deleted temporary file: {file.key}")

Complete Workflow Examples

Upload Multiple Files and Publish

async def create_record_with_files(title, file_paths): """Create record and upload multiple files.""" # Create draft record draft = await client.records.create({ "metadata": { "title": title, "creators": [{"person_or_org": {"name": "Researcher"}}], "resource_type": {"id": "dataset"} } }) # Upload each file for path in file_paths: filename = Path(path).name await client.files.upload( draft, key=filename, metadata={"description": f"Data file: {filename}"}, source=path, progress=f"Uploading {filename}" ) # Publish the record published = await client.records.publish(draft) return published # Use it published = await create_record_with_files( "Research Dataset 2024", [ "/data/experiment1.csv", "/data/experiment2.csv", "/data/README.txt" ] )

Download Complete Record with Files

async def download_record_with_files(record_id, output_dir): """Download record metadata and all files.""" from pathlib import Path import json # Create output directory output_path = Path(output_dir) / record_id output_path.mkdir(parents=True, exist_ok=True) # Get record (can be draft or published) record = await client.records.draft_records.read(record_id) # Save metadata metadata_file = output_path / "metadata.json" with open(metadata_file, 'w') as f: json.dump(record.metadata, f, indent=2) # Download files files = await client.files.list(record) files_dir = output_path / "files" files_dir.mkdir(exist_ok=True) for file in files: file_path = files_dir / file.key await client.files.download( file, str(file_path), progress=f"Downloading {file.key}" ) # Also save file metadata file_meta_path = files_dir / f"{file.key}.metadata.json" with open(file_meta_path, 'w') as f: json.dump({ "key": file.key, "size": file.size, "checksum": file.checksum, "mimetype": file.mimetype, "metadata": file.metadata }, f, indent=2) return output_path # Use it saved_to = await download_record_with_files( "abc-123", "/backups" ) print(f"Downloaded to: {saved_to}")

Replace File with New Version

async def replace_file(record, key, new_file_path): """Replace an existing file with a new version.""" # Delete old file try: await client.files.delete(record, key=key) except Exception: pass # File might not exist # Upload new version file = await client.files.upload( record, key=key, metadata={"description": "Updated version"}, source=new_file_path, progress=f"Uploading {key}" ) return file # Use it updated = await replace_file( record, "data.csv", "/path/to/new_data.csv" )

Sync Files from Directory

async def sync_files_to_record(record, directory_path): """Upload all files from directory to record.""" from pathlib import Path directory = Path(directory_path) uploaded = [] for file_path in directory.iterdir(): if file_path.is_file(): file = await client.files.upload( record, key=file_path.name, metadata={ "description": f"Synced from {directory.name}", "original_path": str(file_path) }, source=str(file_path), progress=f"Uploading {file_path.name}" ) uploaded.append(file) return uploaded # Use it files = await sync_files_to_record( draft_record, "/data/experiment_2024" ) print(f"Uploaded {len(files)} files")

Copy Files Between Records

async def copy_files_between_records(source_record_id, dest_record_id): """Copy all files from one record to another.""" import tempfile from pathlib import Path # Can copy from/to draft or published records source_record = await client.records.draft_records.read(source_record_id) dest_record = await client.records.draft_records.read(dest_record_id) source_files = await client.files.list(source_record) # Use temporary directory for intermediate storage with tempfile.TemporaryDirectory() as tmpdir: for file in source_files: # Download from source tmp_path = Path(tmpdir) / file.key await client.files.download(file, str(tmp_path)) # Upload to destination await client.files.upload( dest_record, key=file.key, metadata=file.metadata.copy(), source=str(tmp_path), progress=f"Copying {file.key}" ) return len(source_files) # Use it copied = await copy_files_between_records("source-123", "dest-456") print(f"Copied {copied} files")

Validate File Checksums

async def validate_file_checksums(record): """Download files and verify their checksums.""" import hashlib import tempfile from pathlib import Path files = await client.files.list(record) results = [] with tempfile.TemporaryDirectory() as tmpdir: for file in files: # Download file tmp_path = Path(tmpdir) / file.key await client.files.download(file, str(tmp_path)) # Calculate checksum hasher = hashlib.md5() with open(tmp_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b''): hasher.update(chunk) calculated = f"md5:{hasher.hexdigest()}" expected = file.checksum results.append({ "file": file.key, "valid": calculated == expected, "expected": expected, "calculated": calculated }) return results # Use it validation = await validate_file_checksums(record) for result in validation: status = "✓" if result["valid"] else "✗" print(f"{status} {result['file']}: {result['valid']}")

Error Handling

from nrp_cmd.errors import ( RepositoryCommunicationError, RepositoryClientError ) try: file = await client.files.upload( record, key="data.csv", metadata={}, source="/path/to/file.csv" ) except FileNotFoundError: print("Source file not found") except RepositoryClientError as e: print(f"Upload failed: {e}") except RepositoryCommunicationError as e: print(f"Network error: {e}")

API Reference

Methods

  • list(record_or_url) - List all files on a record
  • read(file_url) / read(record, key) - Get file metadata
  • upload(record_or_url, key, metadata, source, transfer_type='local-file', transfer_metadata=None, progress=None) - Upload a file
  • download(file_or_url, sink, *, parts=None, part_size=None, progress=None) - Download a file
  • download(record, key, sink, *, parts=None, part_size=None, progress=None) - Download file by key
  • update(file) - Update file metadata
  • delete(record, key=None) / delete(file) / delete(file_url) - Delete a file

Stream Classes

  • FileSource(path) - Read from filesystem
  • FileSink(path) - Write to filesystem
  • MemorySource(data) - Read from memory
  • MemorySink() - Write to memory

Transfer Types

  • 'local-file' - Standard file upload (default)
  • 'url-fetch' - Fetch from URL (if supported)
  • 'multipart' - Multipart upload (if supported)
Last updated on