class RemarkableAPIClient
Asynchronous API client for interacting with the reMarkable Cloud service, providing methods for file management, folder operations, and document synchronization.
/tf/active/vicechatdev/e-ink-llm/remarkable_api_endpoints.py
15 - 352
complex
Purpose
This class provides a comprehensive interface to the reMarkable Cloud API, enabling programmatic access to upload, download, list, update, and delete documents and folders. It handles authentication via JWT tokens, automatically extracts storage host information, and implements the API patterns observed from the reMarkable Chrome extension. The client supports both individual and batch operations, real-time event subscriptions, and hierarchical folder management with path-based operations.
Source Code
class RemarkableAPIClient:
"""
reMarkable Cloud API client based on Chrome extension analysis
"""
def __init__(self, user_token: str):
self.user_token = user_token
self.base_url = self._get_storage_host()
self.headers = {
'Authorization': f'Bearer {user_token}',
'rM-Source': 'RoR-Browser', # From Chrome extension
'User-Agent': 'E-Ink-LLM-Assistant/1.0'
}
def _get_storage_host(self) -> str:
"""
Extract storage host from JWT token (tectonic service)
Based on Chrome extension logic
"""
try:
# Decode JWT payload
payload = self.user_token.split('.')[1]
# Add padding if needed
payload += '=' * (4 - len(payload) % 4)
decoded = json.loads(base64.b64decode(payload))
tectonic = decoded.get('tectonic')
if tectonic and isinstance(tectonic, str) and tectonic:
return f"https://{tectonic}.tectonic.remarkable.com"
else:
# Fallback to internal cloud
return "https://internal.cloud.remarkable.com"
except Exception:
# Fallback if token parsing fails
return "https://internal.cloud.remarkable.com"
async def list_files(self, etag: Optional[str] = None, only_folders: bool = False) -> Dict[str, Any]:
"""
List files in reMarkable Cloud
Args:
etag: ETag for conditional requests (If-None-Match)
only_folders: Only return folders
Returns:
Response with files list and metadata
"""
url = f"{self.base_url}/doc/v2/files"
headers = self.headers.copy()
params = {}
if only_folders:
params['onlyFolders'] = 'true'
if etag:
headers['If-None-Match'] = etag
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params) as response:
return {
'status': response.status,
'headers': dict(response.headers),
'data': await response.json() if response.status == 200 else None,
'etag': response.headers.get('ETag')
}
async def upload_document(self, filename: str, content: bytes,
file_type: str = "application/pdf",
parent_id: Optional[str] = None) -> Dict[str, Any]:
"""
Upload a document to reMarkable Cloud
Args:
filename: Name of the file
content: File content as bytes
file_type: MIME type of file
parent_id: Parent folder ID (None for root)
Returns:
Upload response with document ID
"""
url = f"{self.base_url}/doc/v2/files"
# Create metadata (based on Chrome extension)
metadata = {
'file_name': filename,
'type': 'DocumentType'
}
if parent_id:
metadata['parent'] = parent_id
# Encode metadata as base64 (Chrome extension pattern)
meta_encoded = base64.b64encode(json.dumps(metadata).encode()).decode()
headers = self.headers.copy()
headers['rM-Meta'] = meta_encoded
headers['Content-Type'] = file_type
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data=content) as response:
return {
'status': response.status,
'headers': dict(response.headers),
'data': await response.json() if response.content_type == 'application/json' else await response.text(),
'document_id': response.headers.get('Location', '').split('/')[-1] if response.headers.get('Location') else None
}
async def create_folder(self, folder_name: str, parent_id: Optional[str] = None) -> Dict[str, Any]:
"""
Create a folder in reMarkable Cloud
Args:
folder_name: Name of the folder
parent_id: Parent folder ID (None for root)
Returns:
Folder creation response
"""
url = f"{self.base_url}/doc/v2/files"
# Create folder metadata
metadata = {
'file_name': folder_name,
'type': 'CollectionType'
}
if parent_id:
metadata['parent'] = parent_id
# Encode metadata as base64
meta_encoded = base64.b64encode(json.dumps(metadata).encode()).decode()
headers = self.headers.copy()
headers['rM-Meta'] = meta_encoded
headers['Content-Type'] = 'folder' # Special content type for folders
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, data='') as response:
return {
'status': response.status,
'headers': dict(response.headers),
'data': await response.json() if response.content_type == 'application/json' else await response.text(),
'folder_id': response.headers.get('Location', '').split('/')[-1] if response.headers.get('Location') else None
}
async def update_metadata(self, document_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Update document metadata
Args:
document_id: Document UUID
metadata: Metadata to update
Returns:
Update response
"""
url = f"{self.base_url}/doc/v2/files/{document_id}"
headers = self.headers.copy()
headers['Content-Type'] = 'application/json'
async with aiohttp.ClientSession() as session:
async with session.patch(url, headers=headers, json=metadata) as response:
return {
'status': response.status,
'headers': dict(response.headers),
'data': await response.json() if response.content_type == 'application/json' else await response.text()
}
async def multi_update_metadata(self, updates: List[Dict[str, Any]], hashes: List[str]) -> Dict[str, Any]:
"""
Update multiple documents metadata in batch
Args:
updates: List of metadata updates
hashes: List of document hashes/IDs
Returns:
Batch update response
"""
url = f"{self.base_url}/doc/v2/files"
payload = {
'updates': updates,
'hashes': hashes
}
headers = self.headers.copy()
headers['Content-Type'] = 'application/json'
async with aiohttp.ClientSession() as session:
async with session.patch(url, headers=headers, json=payload) as response:
return {
'status': response.status,
'headers': dict(response.headers),
'data': await response.json() if response.content_type == 'application/json' else await response.text()
}
async def multi_delete_files(self, hashes: List[str]) -> Dict[str, Any]:
"""
Delete multiple files in batch
Args:
hashes: List of document hashes/IDs to delete
Returns:
Batch delete response
"""
url = f"{self.base_url}/doc/v2/files"
payload = {
'hashes': hashes
}
headers = self.headers.copy()
headers['Content-Type'] = 'application/json'
async with aiohttp.ClientSession() as session:
async with session.delete(url, headers=headers, json=payload) as response:
return {
'status': response.status,
'headers': dict(response.headers),
'data': await response.json() if response.content_type == 'application/json' else await response.text()
}
async def export_file(self, document_id: str, export_type: str = "application/pdf") -> Tuple[Dict[str, Any], bytes]:
"""
Export/download a file from reMarkable Cloud
Args:
document_id: Document UUID
export_type: Export format (application/pdf, application/epub+zip, etc.)
Returns:
Tuple of (response_info, file_content)
"""
url = f"{self.base_url}/doc/v2/files/{document_id}/export"
headers = self.headers.copy()
headers['Accept'] = export_type
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as response:
content = await response.read()
return {
'status': response.status,
'headers': dict(response.headers),
'content_type': response.content_type
}, content
async def get_events_token(self) -> str:
"""
Get SSE (Server-Sent Events) token for real-time updates
Returns:
SSE token string
"""
url = f"{self.base_url}/doc/v2/events/get-token"
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=self.headers) as response:
if response.status == 200:
data = await response.json()
return data.get('token', '')
else:
raise Exception(f"Failed to get events token: {response.status}")
async def find_folder_by_path(self, folder_path: str) -> Optional[str]:
"""
Find folder ID by path (e.g., "/My Folder/Subfolder")
Args:
folder_path: Path to folder (starting with /)
Returns:
Folder ID if found, None otherwise
"""
if folder_path == "/" or folder_path == "":
return None # Root folder
# Get all files/folders
response = await self.list_files()
if response['status'] != 200 or not response['data']:
return None
files = response['data']
path_parts = [part for part in folder_path.split('/') if part]
current_parent = None
for part in path_parts:
found = False
for item in files:
if (item.get('name') == part and
item.get('type') == 'CollectionType' and
item.get('parent') == current_parent):
current_parent = item.get('id')
found = True
break
if not found:
return None
return current_parent
async def ensure_folder_exists(self, folder_path: str) -> str:
"""
Ensure folder exists, create if necessary
Args:
folder_path: Path to folder (starting with /)
Returns:
Folder ID
"""
folder_id = await self.find_folder_by_path(folder_path)
if folder_id is not None:
return folder_id
# Create folder(s) as needed
path_parts = [part for part in folder_path.split('/') if part]
current_parent = None
current_path = ""
for part in path_parts:
current_path += f"/{part}"
existing_id = await self.find_folder_by_path(current_path)
if existing_id:
current_parent = existing_id
else:
# Create this folder
response = await self.create_folder(part, current_parent)
if response['status'] in [200, 201] and response['folder_id']:
current_parent = response['folder_id']
else:
raise Exception(f"Failed to create folder '{part}': {response}")
return current_parent
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
user_token: JWT authentication token for reMarkable Cloud API access. This token should be obtained through the reMarkable authentication flow and contains encoded information about the storage host (tectonic service). The token is used in the Authorization header for all API requests.
Return Value
Instantiation returns a RemarkableAPIClient object configured with the provided user token. Methods return dictionaries containing 'status' (HTTP status code), 'headers' (response headers as dict), and 'data' (parsed response body). Upload and folder creation methods also include 'document_id' or 'folder_id' extracted from the Location header. The export_file method returns a tuple of (response_info_dict, file_content_bytes). Path-based methods return folder IDs as strings or None.
Class Interface
Methods
__init__(self, user_token: str)
Purpose: Initialize the API client with authentication token and configure base URL and headers
Parameters:
user_token: JWT authentication token for reMarkable Cloud API
Returns: None - initializes instance attributes
_get_storage_host(self) -> str
Purpose: Extract storage host URL from JWT token's tectonic service field
Returns: Storage host URL string (e.g., 'https://xyz.tectonic.remarkable.com' or fallback URL)
async list_files(self, etag: Optional[str] = None, only_folders: bool = False) -> Dict[str, Any]
Purpose: Retrieve list of files and folders from reMarkable Cloud with optional filtering and conditional requests
Parameters:
etag: ETag value for conditional request (If-None-Match header) to check if content changedonly_folders: If True, only return folders (CollectionType items)
Returns: Dictionary with 'status' (int), 'headers' (dict), 'data' (list of file objects or None), 'etag' (str or None)
async upload_document(self, filename: str, content: bytes, file_type: str = 'application/pdf', parent_id: Optional[str] = None) -> Dict[str, Any]
Purpose: Upload a document to reMarkable Cloud with metadata
Parameters:
filename: Name of the file to display in reMarkablecontent: File content as bytesfile_type: MIME type of the file (e.g., 'application/pdf', 'application/epub+zip')parent_id: UUID of parent folder, or None for root
Returns: Dictionary with 'status', 'headers', 'data', and 'document_id' (extracted from Location header)
async create_folder(self, folder_name: str, parent_id: Optional[str] = None) -> Dict[str, Any]
Purpose: Create a new folder in reMarkable Cloud
Parameters:
folder_name: Name of the folder to createparent_id: UUID of parent folder, or None for root
Returns: Dictionary with 'status', 'headers', 'data', and 'folder_id' (extracted from Location header)
async update_metadata(self, document_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]
Purpose: Update metadata for a single document or folder
Parameters:
document_id: UUID of the document/folder to updatemetadata: Dictionary of metadata fields to update (e.g., name, parent, pinned)
Returns: Dictionary with 'status', 'headers', and 'data' (response body)
async multi_update_metadata(self, updates: List[Dict[str, Any]], hashes: List[str]) -> Dict[str, Any]
Purpose: Update metadata for multiple documents in a single batch request
Parameters:
updates: List of metadata update dictionarieshashes: List of document UUIDs/hashes corresponding to updates
Returns: Dictionary with 'status', 'headers', and 'data' (batch operation response)
async multi_delete_files(self, hashes: List[str]) -> Dict[str, Any]
Purpose: Delete multiple files/folders in a single batch request
Parameters:
hashes: List of document/folder UUIDs to delete
Returns: Dictionary with 'status', 'headers', and 'data' (batch delete response)
async export_file(self, document_id: str, export_type: str = 'application/pdf') -> Tuple[Dict[str, Any], bytes]
Purpose: Download/export a file from reMarkable Cloud in specified format
Parameters:
document_id: UUID of the document to exportexport_type: MIME type for export format (e.g., 'application/pdf', 'application/epub+zip')
Returns: Tuple of (response_info_dict with 'status', 'headers', 'content_type', file_content_bytes)
async get_events_token(self) -> str
Purpose: Obtain SSE (Server-Sent Events) token for subscribing to real-time document updates
Returns: SSE token string for event subscription
async find_folder_by_path(self, folder_path: str) -> Optional[str]
Purpose: Find folder UUID by hierarchical path (e.g., '/My Folder/Subfolder')
Parameters:
folder_path: Path to folder starting with '/' (e.g., '/Projects/2024')
Returns: Folder UUID string if found, None if not found or for root path
async ensure_folder_exists(self, folder_path: str) -> str
Purpose: Ensure folder path exists, creating intermediate folders as needed
Parameters:
folder_path: Path to folder starting with '/' (e.g., '/Projects/2024')
Returns: Folder UUID string of the final folder in the path
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
user_token |
str | JWT authentication token for API requests | instance |
base_url |
str | Base URL for API requests, extracted from JWT token or fallback URL | instance |
headers |
Dict[str, str] | Default HTTP headers for API requests including Authorization, rM-Source, and User-Agent | instance |
Dependencies
jsonbase64aiohttptypingpathlibasyncio
Required Imports
import json
import base64
import aiohttp
from typing import Dict, Any, Optional, List, Tuple
from pathlib import Path
import asyncio
Usage Example
import asyncio
import json
import base64
import aiohttp
from typing import Dict, Any, Optional, List, Tuple
# Instantiate the client
user_token = 'your_jwt_token_here'
client = RemarkableAPIClient(user_token)
async def main():
# List all files
files_response = await client.list_files()
if files_response['status'] == 200:
print(f"Found {len(files_response['data'])} files")
# Create a folder
folder_response = await client.create_folder('My Notes')
folder_id = folder_response['folder_id']
# Upload a PDF document
with open('document.pdf', 'rb') as f:
content = f.read()
upload_response = await client.upload_document(
filename='document.pdf',
content=content,
file_type='application/pdf',
parent_id=folder_id
)
doc_id = upload_response['document_id']
# Export/download a file
response_info, file_content = await client.export_file(doc_id)
with open('downloaded.pdf', 'wb') as f:
f.write(file_content)
# Ensure folder path exists
folder_id = await client.ensure_folder_exists('/Projects/2024')
# Delete files
delete_response = await client.multi_delete_files([doc_id])
asyncio.run(main())
Best Practices
- Always use async/await syntax when calling any method of this class
- Ensure the user_token is valid and not expired before making API calls
- Handle HTTP status codes appropriately - check response['status'] before accessing response['data']
- Use ensure_folder_exists() instead of create_folder() when you need to guarantee a folder path exists
- Close aiohttp sessions properly - the class creates new sessions for each request which are automatically closed
- For batch operations, use multi_update_metadata() and multi_delete_files() instead of individual calls for better performance
- Store and reuse ETags from list_files() responses to implement efficient polling with If-None-Match headers
- The client automatically handles JWT token parsing to extract the storage host, falling back to internal.cloud.remarkable.com if parsing fails
- When uploading documents, ensure content is in bytes format and file_type matches the actual content MIME type
- For real-time updates, call get_events_token() to obtain an SSE token for subscribing to document change events
- Path-based folder operations (find_folder_by_path, ensure_folder_exists) require listing all files, which may be slow for large libraries
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class Client 80.1% similar
-
class RemarkableCloudManager 79.1% similar
-
class RemarkableAuth 72.5% similar
-
class RemarkableRestClient 71.9% similar
-
function test_remarkable_auth 70.1% similar