class Client
API Client for the Remarkable Cloud that handles authentication, communication, and document management with the Remarkable Cloud service.
/tf/active/vicechatdev/rmcl/api.py
44 - 377
complex
Purpose
This class provides a comprehensive interface to interact with the Remarkable Cloud API. It manages authentication (device registration and token renewal), maintains a local cache of documents and folders, handles API requests with automatic token refresh, and provides methods for uploading, downloading, updating, and deleting documents. The client uses async/await patterns with trio for concurrency and maintains state about the document hierarchy.
Source Code
class Client:
"""API Client for Remarkable Cloud
This allows you to authenticate & communicate with the Remarkable Cloud
and does all the heavy lifting for you.
"""
def __init__(self):
self.config = Config()
root = items.VirtualFolder('', ROOT_ID)
trash = items.VirtualFolder('.trash', TRASH_ID, root.id)
self.by_id = {root.id: root, trash.id: trash}
self.refresh_deadline = None
self.update_lock = trio.Lock()
self._base_url = None
async def request(self, method: str, path: str,
data=None,
body=None, headers=None,
params=None, stream=False,
allow_renew=True) -> asks.response_objects.Response:
"""Creates a request against the Remarkable Cloud API
This function automatically fills in the blanks of base
url & authentication.
Args:
method: The request method.
path: complete url or path to request.
data: raw data to put/post/...
body: the body to request with. This will be converted to json.
headers: a dict of additional headers to add to the request.
params: Query params to append to the request.
stream: Should the response be a stream?
Returns:
A Response instance containing most likely the response from
the server.
"""
if headers is None:
headers = {}
if not path.startswith("http"):
if not path.startswith('/'):
path = '/' + path
url = f"https://{await self.base_url()}{path}"
else:
url = path
_headers = {
"user-agent": USER_AGENT,
}
if allow_renew and (not self.config.get("usertoken") or
now().timestamp() - self.config.get("usertoken-timestamp", 0) > USER_TOKEN_VALIDITY):
await self.renew_token()
# Get the usertoken again, in case it was updated above
if self.config.get("usertoken"):
token = self.config["usertoken"]
_headers["Authorization"] = f"Bearer {token}"
for k in headers.keys():
_headers[k] = headers[k]
resp = await asks.request(method, url,
json=body,
data=data,
headers=_headers,
params=params,
stream=stream)
if not (allow_renew and resp.status_code == 401):
return resp
log.debug("Got 401 code; trying to renew token")
await self.renew_token()
return await self.request(method, path, data, body, headers, params, stream, False)
async def base_url(self):
if self._base_url is None:
resp = await self.request("GET", SERVICE_MGR_URL)
try:
self._base_url = resp.json().get("Host")
except json.decoder.JSONDecodeError:
raise ApiError("Failed to get service URL", resp)
return self._base_url
async def register_device(self, code: str):
"""Registers a device on the Remarkable Cloud.
This uses a unique code the user gets from
https://my.remarkable.com/connect/remarkable to register a new device
or client to be able to execute api calls.
Args:
code: A unique One time code the user can get
at https://my.remarkable.com/connect/remarkable .
Returns:
True
Raises:
AuthError: We didn't recieved an devicetoken from the Remarkable
Cloud.
"""
uuid = str(uuid4())
body = {
"code": code,
"deviceDesc": DEVICE,
"deviceID": uuid,
}
response = await self.request("POST", DEVICE_TOKEN_URL, body=body, allow_renew=False)
if response.status_code == 200:
self.config["devicetoken"] = response.text
return True
else:
raise AuthError(f"Could not register device (status code {response.status_code})")
async def prompt_register_device(self):
if self.config.get("devicetoken"):
return
if not (sys.stdin.isatty() and sys.stdout.isatty()):
raise AuthError("Device is not registered and not on a TTY to prompt user")
print(textwrap.dedent(f"""
This reMarkable client needs to be registered with the reMarkable
cloud. To do this, please visit
{DEVICE_REGISTER_URL}
to get a one-time code.
(You may be prompted to log into your reMarkable cloud account. If
this happens, you may not be redirected to the one-time code page.
In this case, you may open the above link a second time to get the
code.)
""").strip())
code = input("\nEnter the one-time code: ")
return await self.register_device(code)
async def renew_token(self):
"""Fetches a new user_token.
This is the second step of the authentication of the Remarkable Cloud.
Before each new session, you should fetch a new user token.
User tokens have an unknown expiration date.
Returns:
True
Raises:
AuthError: An error occurred while renewing the user token.
"""
log.debug("Renewing user token")
if not self.config.get("devicetoken"):
raise AuthError("Please register a device first")
headers = {"Authorization": f'Bearer {self.config["devicetoken"]}'}
response = await self.request("POST", USER_TOKEN_URL,
headers=headers, allow_renew=False)
if response.status_code < 400:
self.config.update({
"usertoken": response.text,
"usertoken-timestamp": now().timestamp()
})
return True
else:
raise AuthError("Can't renew token: {e}".format(
e=response.status_code))
async def update_items(self):
response = await self.request('GET', '/document-storage/json/2/docs')
try:
response_json = response.json()
except json.decoder.JSONDecodeError:
log.error(f"Failed to decode JSON from {response.content}")
log.error(f"Response code: {response.status_code}")
raise ApiError("Failed to decode JSON data")
old_ids = set(self.by_id) - {ROOT_ID, TRASH_ID}
self.by_id[ROOT_ID].children = []
self.by_id[TRASH_ID].children = []
for item in response_json:
old = self.by_id.get(item['ID'])
if old:
old_ids.remove(old.id)
if not old or old.version != item['Version']:
new = items.Item.from_metadata(item)
self.by_id[new.id] = new
elif isinstance(old, items.Folder):
old.children = []
for id_ in old_ids:
del self.by_id[id_]
for i in self.by_id.values():
if i.parent is not None:
parent = self.by_id.get(i.parent)
if isinstance(parent, items.Folder):
parent.children.append(i)
else:
# Remarkable treats items with missing parents as in root
self.by_id[ROOT_ID].children.append(i)
self.refresh_deadline = now() + FILE_LIST_VALIDITY
async def get_by_id(self, id_):
async with self.update_lock:
if not self.refresh_deadline or now() > self.refresh_deadline:
await self.update_items()
return self.by_id[id_]
async def get_metadata(self, id_, downloadable=True):
response = await self.request('GET', '/document-storage/json/2/docs',
params={'doc': id_, 'withBlob': downloadable})
for meta in response.json():
if meta['ID'] == id_:
return meta
raise DocumentNotFound(f"Could not find document {id_}")
async def get_blob(self, url):
response = await self.request('GET', url)
return response.content
async def get_blob_size(self, url):
response = await self.request('HEAD', url)
return int(response.headers.get('Content-Length', 0))
async def get_file_details(self, url):
response = await self.request('GET', url, headers={'Range': f'bytes=-{NBYTES}'})
# Want to start a known file extension - file name length - fixed header length
key_index = response.content.rfind(b'.content') - 36 - 46
if key_index < 0:
return FileType.unknown, None
stream = io.BytesIO(response.content[key_index:])
item = ZipHeader.from_stream(stream)
while item is not None:
if item.filename.endswith(b'.pdf'):
return FileType.pdf, item.uncompressed_size
if item.filename.endswith(b'.epub'):
return FileType.epub, item.uncompressed_size
item = ZipHeader.from_stream(stream)
return FileType.notes, None
async def delete(self, item: items.Item):
"""Delete a document from the cloud.
Args:
doc: A Document or folder to delete.
Raises:
ApiError: an error occurred while uploading the document.
"""
response = await self.request("PUT", "/document-storage/json/2/delete",
body=[{
"ID": item.id,
"Version": item.version
}])
self.refresh_deadline = None
return self.check_response(response)
async def update_metadata(self, item: items.Item):
"""Send an update of the current metadata of a meta object
Update the meta item.
Args:
docorfolder: A document or folder to update the meta information
from.
"""
# Copy the metadata so that the object gets out of date and will be refreshed
metadata = item._metadata.copy()
metadata['Version'] += 1
metadata["ModifiedClient"] = now().strftime(RFC3339Nano)
res = await self.request("PUT",
"/document-storage/json/2/upload/update-status",
body=[metadata])
self.refresh_deadline = None
return self.check_response(res)
async def upload(self, item, contents):
res = await self.request('PUT', '/document-storage/json/2/upload/request',
body=[{
'ID': item.id,
'Version': item.version + 1,
'Type': item._metadata['Type']
}])
self.check_response(res)
try:
dest = res.json()[0]['BlobURLPut']
except (IndexError, KeyError):
log.error("Failed to get upload URL")
raise ApiError("Failed to get upload URL", response=res)
up_res = await self.request('PUT', dest, data=contents.read(),
headers={'Content-Type': ''})
if up_res.status_code >= 400:
log.error(f"Upload failed with status {up_res.status_code}")
raise ApiError(f"Upload failed with status {up_res.status_code}", response=up_res)
await self.update_metadata(item)
@staticmethod
def check_response(response: asks.response_objects.Response):
"""Check the response from an API Call
Does some sanity checking on the Response
Args:
response: A API Response
Returns:
True if the response looks ok
Raises:
ApiError: When the response contains an error
"""
if response.status_code >= 400:
log.error(f"Got An invalid HTTP Response: {response.status_code}")
raise ApiError(f"Got An invalid HTTP Response: {response.status_code}",
response=response)
if len(response.json()) == 0:
log.error("Got an empty response")
raise ApiError("Got An empty response", response=response)
if not response.json()[0]["Success"]:
log.error("Got a non-success response")
msg = response.json()[0]["Message"]
log.error(msg)
raise ApiError(msg, response=response)
return True
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
__init__: No parameters required. Initializes the client with a Config instance, creates virtual root and trash folders, sets up an empty document cache (by_id), initializes a refresh deadline for cache invalidation, creates an update lock for thread-safe operations, and sets the base URL to None (fetched lazily on first request).
Return Value
Instantiation returns a Client object. Key method returns: request() returns an asks.response_objects.Response; register_device() returns True on success; renew_token() returns True on success; update_items() returns None but updates internal state; get_by_id() returns an items.Item object; get_metadata() returns a dict of metadata; get_blob() returns bytes content; get_blob_size() returns int; get_file_details() returns tuple of (FileType, Optional[int]); delete() returns True on success; update_metadata() returns True on success; upload() returns None; check_response() returns True on success.
Class Interface
Methods
__init__(self)
Purpose: Initialize the Client with configuration, virtual folders, and empty state
Returns: None
async request(self, method: str, path: str, data=None, body=None, headers=None, params=None, stream=False, allow_renew=True) -> asks.response_objects.Response
Purpose: Creates an authenticated HTTP request against the Remarkable Cloud API with automatic token renewal
Parameters:
method: HTTP method (GET, POST, PUT, DELETE, etc.)path: Complete URL or path to request (relative paths are prefixed with base_url)data: Raw data to send in request bodybody: Object to be JSON-encoded and sent as request bodyheaders: Dictionary of additional headers to includeparams: Query parameters to append to URLstream: Boolean indicating if response should be streamedallow_renew: Boolean to allow automatic token renewal on 401 errors
Returns: asks.response_objects.Response object containing the server response
async base_url(self)
Purpose: Fetches and caches the base URL for API requests from the service manager
Returns: String containing the base URL hostname
async register_device(self, code: str)
Purpose: Registers a new device with the Remarkable Cloud using a one-time code
Parameters:
code: One-time registration code obtained from https://my.remarkable.com/connect/remarkable
Returns: True on successful registration, raises AuthError on failure
async prompt_register_device(self)
Purpose: Interactively prompts the user for a registration code and registers the device if not already registered
Returns: Result of register_device() or None if already registered
async renew_token(self)
Purpose: Fetches a new user token for API authentication using the device token
Returns: True on success, raises AuthError on failure
async update_items(self)
Purpose: Fetches the complete list of documents and folders from the cloud and updates the local cache
Returns: None, but updates self.by_id dictionary and refresh_deadline
async get_by_id(self, id_)
Purpose: Retrieves an item (document or folder) by its ID, refreshing cache if needed
Parameters:
id_: String ID of the item to retrieve
Returns: items.Item object (Document, Folder, or VirtualFolder)
async get_metadata(self, id_, downloadable=True)
Purpose: Fetches detailed metadata for a specific document from the API
Parameters:
id_: String ID of the documentdownloadable: Boolean indicating whether to include blob URL in response
Returns: Dictionary containing document metadata, raises DocumentNotFound if not found
async get_blob(self, url)
Purpose: Downloads the binary content from a blob URL
Parameters:
url: String URL to download from (typically from metadata BlobURLGet)
Returns: Bytes containing the blob content
async get_blob_size(self, url)
Purpose: Gets the size of a blob without downloading it using HEAD request
Parameters:
url: String URL to check size for
Returns: Integer size in bytes
async get_file_details(self, url)
Purpose: Determines the file type (PDF, EPUB, or notes) and uncompressed size by examining the blob
Parameters:
url: String URL of the blob to examine
Returns: Tuple of (FileType enum, Optional[int] uncompressed_size)
async delete(self, item: items.Item)
Purpose: Deletes a document or folder from the cloud
Parameters:
item: items.Item object to delete
Returns: True on success, raises ApiError on failure
async update_metadata(self, item: items.Item)
Purpose: Sends updated metadata for an item to the cloud, incrementing version
Parameters:
item: items.Item object with modified metadata
Returns: True on success, raises ApiError on failure
async upload(self, item, contents)
Purpose: Uploads a new version of a document to the cloud
Parameters:
item: items.Item object to uploadcontents: File-like object with read() method containing the document data
Returns: None, raises ApiError on failure
check_response(response: asks.response_objects.Response)
static
Purpose: Validates an API response and raises ApiError if the response indicates failure
Parameters:
response: asks.response_objects.Response object to validate
Returns: True if response is valid, raises ApiError otherwise
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
config |
Config | Configuration object storing device token, user token, and timestamps | instance |
by_id |
dict[str, items.Item] | Dictionary mapping item IDs to Item objects (documents and folders), serves as local cache | instance |
refresh_deadline |
Optional[datetime] | Timestamp when the local cache should be refreshed, None forces immediate refresh | instance |
update_lock |
trio.Lock | Lock to ensure thread-safe access to shared state during updates | instance |
_base_url |
Optional[str] | Cached base URL for API requests, fetched lazily on first use | instance |
Dependencies
askstriojsonsystextwrapiologgingenumuuid
Required Imports
import asks
import trio
import json
import sys
import textwrap
import io
import logging
from uuid import uuid4
from config import Config
from items import Item, Folder, VirtualFolder
from utils import now
from zipdir import ZipHeader
from exceptions import AuthError, DocumentNotFound, ApiError
from const import RFC3339Nano, USER_AGENT, DEVICE_TOKEN_URL, USER_TOKEN_URL, USER_TOKEN_VALIDITY, DEVICE_REGISTER_URL, DEVICE, NBYTES, FILE_LIST_VALIDITY, ROOT_ID, SERVICE_MGR_URL, TRASH_ID, FileType
Usage Example
import trio
from client import Client
async def main():
# Instantiate the client
client = Client()
# Register device (first time only)
# This will prompt for a code from https://my.remarkable.com/connect/remarkable
await client.prompt_register_device()
# Renew authentication token
await client.renew_token()
# Get the root folder and list items
root = await client.get_by_id(ROOT_ID)
for item in root.children:
print(f"{item.name} ({item.id})")
# Get metadata for a specific document
metadata = await client.get_metadata('some-document-id')
# Download a blob
blob_url = metadata.get('BlobURLGet')
if blob_url:
content = await client.get_blob(blob_url)
# Update metadata for an item
item = await client.get_by_id('some-id')
item.name = 'New Name'
await client.update_metadata(item)
# Delete an item
await client.delete(item)
trio.run(main)
Best Practices
- Always call prompt_register_device() or register_device() before first use to obtain device token
- Token renewal is handled automatically by request() method, but can be called manually with renew_token()
- Use async/await patterns with trio for all method calls
- The client maintains an internal cache (by_id) that is refreshed based on FILE_LIST_VALIDITY deadline
- Use update_lock when accessing shared state to ensure thread safety
- Call update_items() to refresh the local cache of documents and folders
- After modifying items (delete, update_metadata, upload), the refresh_deadline is set to None to force cache refresh
- The client automatically retries requests with 401 status by renewing tokens
- Use get_by_id() instead of directly accessing by_id dict to ensure cache is up-to-date
- check_response() should be used to validate API responses and will raise ApiError on failures
- The client creates virtual folders for root (ROOT_ID) and trash (TRASH_ID) that are always present
- Device tokens are long-lived, user tokens expire and need renewal (handled automatically)
- When uploading, the item version is automatically incremented
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class RemarkableAPIClient 80.1% similar
-
class RemarkableCloudManager 73.4% similar
-
function test_api_client 70.7% similar
-
class RemarkableAuth 70.7% similar
-
class RemarkableRestClient 68.7% similar