🔍 Code Extractor

class Client

Maturity: 53

API Client for the Remarkable Cloud that handles authentication, communication, and document management with the Remarkable Cloud service.

File:
/tf/active/vicechatdev/rmcl/api.py
Lines:
44 - 377
Complexity:
complex

Purpose

This class provides a comprehensive interface to interact with the Remarkable Cloud API. It manages authentication (device registration and token renewal), maintains a local cache of documents and folders, handles API requests with automatic token refresh, and provides methods for uploading, downloading, updating, and deleting documents. The client uses async/await patterns with trio for concurrency and maintains state about the document hierarchy.

Source Code

class Client:
    """API Client for Remarkable Cloud

    This allows you to authenticate & communicate with the Remarkable Cloud
    and does all the heavy lifting for you.
    """

    def __init__(self):
        self.config = Config()

        root = items.VirtualFolder('', ROOT_ID)
        trash = items.VirtualFolder('.trash', TRASH_ID, root.id)
        self.by_id = {root.id: root, trash.id: trash}
        self.refresh_deadline = None
        self.update_lock = trio.Lock()
        self._base_url = None

    async def request(self, method: str, path: str,
                      data=None,
                      body=None, headers=None,
                      params=None, stream=False,
                      allow_renew=True) -> asks.response_objects.Response:
        """Creates a request against the Remarkable Cloud API

        This function automatically fills in the blanks of base
        url & authentication.

        Args:
            method: The request method.
            path: complete url or path to request.
            data: raw data to put/post/...
            body: the body to request with. This will be converted to json.
            headers: a dict of additional headers to add to the request.
            params: Query params to append to the request.
            stream: Should the response be a stream?
        Returns:
            A Response instance containing most likely the response from
            the server.
        """

        if headers is None:
            headers = {}
        if not path.startswith("http"):
            if not path.startswith('/'):
                path = '/' + path
            url = f"https://{await self.base_url()}{path}"
        else:
            url = path

        _headers = {
            "user-agent": USER_AGENT,
        }

        if allow_renew and (not self.config.get("usertoken") or
            now().timestamp() - self.config.get("usertoken-timestamp", 0) > USER_TOKEN_VALIDITY):
            await self.renew_token()

        # Get the usertoken again, in case it was updated above
        if self.config.get("usertoken"):
            token = self.config["usertoken"]
            _headers["Authorization"] = f"Bearer {token}"
        for k in headers.keys():
            _headers[k] = headers[k]
        resp = await asks.request(method, url,
                                  json=body,
                                  data=data,
                                  headers=_headers,
                                  params=params,
                                  stream=stream)
        if not (allow_renew and resp.status_code == 401):
            return resp

        log.debug("Got 401 code; trying to renew token")
        await self.renew_token()
        return await self.request(method, path, data, body, headers, params, stream, False)

    async def base_url(self):
        if self._base_url is None:
            resp = await self.request("GET", SERVICE_MGR_URL)
            try:
                self._base_url = resp.json().get("Host")
            except json.decoder.JSONDecodeError:
                raise ApiError("Failed to get service URL", resp)
        return self._base_url

    async def register_device(self, code: str):
        """Registers a device on the Remarkable Cloud.

        This uses a unique code the user gets from
        https://my.remarkable.com/connect/remarkable to register a new device
        or client to be able to execute api calls.

        Args:
            code: A unique One time code the user can get
                at https://my.remarkable.com/connect/remarkable .
        Returns:
            True
        Raises:
            AuthError: We didn't recieved an devicetoken from the Remarkable
                Cloud.
        """

        uuid = str(uuid4())
        body = {
            "code": code,
            "deviceDesc": DEVICE,
            "deviceID": uuid,

        }
        response = await self.request("POST", DEVICE_TOKEN_URL, body=body, allow_renew=False)
        if response.status_code == 200:
            self.config["devicetoken"] = response.text
            return True
        else:
            raise AuthError(f"Could not register device (status code {response.status_code})")

    async def prompt_register_device(self):
        if self.config.get("devicetoken"):
            return

        if not (sys.stdin.isatty() and sys.stdout.isatty()):
            raise AuthError("Device is not registered and not on a TTY to prompt user")

        print(textwrap.dedent(f"""
            This reMarkable client needs to be registered with the reMarkable
            cloud. To do this, please visit
                {DEVICE_REGISTER_URL}
            to get a one-time code.

            (You may be prompted to log into your reMarkable cloud account. If
            this happens, you may not be redirected to the one-time code page.
            In this case, you may open the above link a second time to get the
            code.)
        """).strip())
        code = input("\nEnter the one-time code: ")
        return await self.register_device(code)

    async def renew_token(self):
        """Fetches a new user_token.

        This is the second step of the authentication of the Remarkable Cloud.
        Before each new session, you should fetch a new user token.
        User tokens have an unknown expiration date.

        Returns:
            True

        Raises:
            AuthError: An error occurred while renewing the user token.
        """

        log.debug("Renewing user token")
        if not self.config.get("devicetoken"):
            raise AuthError("Please register a device first")
        headers = {"Authorization": f'Bearer {self.config["devicetoken"]}'}
        response = await self.request("POST", USER_TOKEN_URL,
                                      headers=headers, allow_renew=False)
        if response.status_code < 400:
            self.config.update({
                "usertoken": response.text,
                "usertoken-timestamp": now().timestamp()
            })
            return True
        else:
            raise AuthError("Can't renew token: {e}".format(
                e=response.status_code))

    async def update_items(self):
        response = await self.request('GET', '/document-storage/json/2/docs')
        try:
            response_json = response.json()
        except json.decoder.JSONDecodeError:
            log.error(f"Failed to decode JSON from {response.content}")
            log.error(f"Response code: {response.status_code}")
            raise ApiError("Failed to decode JSON data")

        old_ids = set(self.by_id) - {ROOT_ID, TRASH_ID}
        self.by_id[ROOT_ID].children = []
        self.by_id[TRASH_ID].children = []
        for item in response_json:
            old = self.by_id.get(item['ID'])
            if old:
                old_ids.remove(old.id)
            if not old or old.version != item['Version']:
                new = items.Item.from_metadata(item)
                self.by_id[new.id] = new
            elif isinstance(old, items.Folder):
                old.children = []

        for id_ in old_ids:
            del self.by_id[id_]

        for i in self.by_id.values():
            if i.parent is not None:
                parent = self.by_id.get(i.parent)
                if isinstance(parent, items.Folder):
                    parent.children.append(i)
                else:
                    # Remarkable treats items with missing parents as in root
                    self.by_id[ROOT_ID].children.append(i)

        self.refresh_deadline = now() + FILE_LIST_VALIDITY

    async def get_by_id(self, id_):
        async with self.update_lock:
            if not self.refresh_deadline or now() > self.refresh_deadline:
                await self.update_items()

        return self.by_id[id_]

    async def get_metadata(self, id_, downloadable=True):
        response = await self.request('GET', '/document-storage/json/2/docs',
                                params={'doc': id_, 'withBlob': downloadable})
        for meta in response.json():
            if meta['ID'] == id_:
                return meta
        raise DocumentNotFound(f"Could not find document {id_}")

    async def get_blob(self, url):
        response = await self.request('GET', url)
        return response.content

    async def get_blob_size(self, url):
        response = await self.request('HEAD', url)
        return int(response.headers.get('Content-Length', 0))

    async def get_file_details(self, url):
        response = await self.request('GET', url, headers={'Range': f'bytes=-{NBYTES}'})
        # Want to start a known file extension - file name length - fixed header length
        key_index = response.content.rfind(b'.content') - 36 - 46
        if key_index < 0:
            return FileType.unknown, None

        stream = io.BytesIO(response.content[key_index:])
        item = ZipHeader.from_stream(stream)
        while item is not None:
            if item.filename.endswith(b'.pdf'):
                return FileType.pdf, item.uncompressed_size
            if item.filename.endswith(b'.epub'):
                return FileType.epub, item.uncompressed_size
            item = ZipHeader.from_stream(stream)
        return FileType.notes, None

    async def delete(self, item: items.Item):
        """Delete a document from the cloud.

        Args:
            doc: A Document or folder to delete.
        Raises:
            ApiError: an error occurred while uploading the document.
        """

        response = await self.request("PUT", "/document-storage/json/2/delete",
                                      body=[{
                                          "ID": item.id,
                                          "Version": item.version
                                      }])
        self.refresh_deadline = None

        return self.check_response(response)

    async def update_metadata(self, item: items.Item):
        """Send an update of the current metadata of a meta object

        Update the meta item.

        Args:
            docorfolder: A document or folder to update the meta information
                from.
        """

        # Copy the metadata so that the object gets out of date and will be refreshed
        metadata = item._metadata.copy()
        metadata['Version'] += 1
        metadata["ModifiedClient"] = now().strftime(RFC3339Nano)
        res = await self.request("PUT",
                                 "/document-storage/json/2/upload/update-status",
                                 body=[metadata])
        self.refresh_deadline = None

        return self.check_response(res)

    async def upload(self, item, contents):
        res = await self.request('PUT', '/document-storage/json/2/upload/request',
                                 body=[{
                                     'ID': item.id,
                                     'Version': item.version + 1,
                                     'Type': item._metadata['Type']
                                 }])
        self.check_response(res)
        try:
            dest = res.json()[0]['BlobURLPut']
        except (IndexError, KeyError):
            log.error("Failed to get upload URL")
            raise ApiError("Failed to get upload URL", response=res)
        up_res = await self.request('PUT', dest, data=contents.read(),
                                    headers={'Content-Type': ''})
        if up_res.status_code >= 400:
            log.error(f"Upload failed with status {up_res.status_code}")
            raise ApiError(f"Upload failed with status {up_res.status_code}", response=up_res)
        await self.update_metadata(item)

    @staticmethod
    def check_response(response: asks.response_objects.Response):
        """Check the response from an API Call

        Does some sanity checking on the Response

        Args:
            response: A API Response

        Returns:
            True if the response looks ok

        Raises:
            ApiError: When the response contains an error
        """

        if response.status_code >= 400:
            log.error(f"Got An invalid HTTP Response: {response.status_code}")
            raise ApiError(f"Got An invalid HTTP Response: {response.status_code}",
                           response=response)

        if len(response.json()) == 0:
            log.error("Got an empty response")
            raise ApiError("Got An empty response", response=response)

        if not response.json()[0]["Success"]:
            log.error("Got a non-success response")
            msg = response.json()[0]["Message"]
            log.error(msg)
            raise ApiError(msg, response=response)

        return True

Parameters

Name Type Default Kind
bases - -

Parameter Details

__init__: No parameters required. Initializes the client with a Config instance, creates virtual root and trash folders, sets up an empty document cache (by_id), initializes a refresh deadline for cache invalidation, creates an update lock for thread-safe operations, and sets the base URL to None (fetched lazily on first request).

Return Value

Instantiation returns a Client object. Key method returns: request() returns an asks.response_objects.Response; register_device() returns True on success; renew_token() returns True on success; update_items() returns None but updates internal state; get_by_id() returns an items.Item object; get_metadata() returns a dict of metadata; get_blob() returns bytes content; get_blob_size() returns int; get_file_details() returns tuple of (FileType, Optional[int]); delete() returns True on success; update_metadata() returns True on success; upload() returns None; check_response() returns True on success.

Class Interface

Methods

__init__(self)

Purpose: Initialize the Client with configuration, virtual folders, and empty state

Returns: None

async request(self, method: str, path: str, data=None, body=None, headers=None, params=None, stream=False, allow_renew=True) -> asks.response_objects.Response

Purpose: Creates an authenticated HTTP request against the Remarkable Cloud API with automatic token renewal

Parameters:

  • method: HTTP method (GET, POST, PUT, DELETE, etc.)
  • path: Complete URL or path to request (relative paths are prefixed with base_url)
  • data: Raw data to send in request body
  • body: Object to be JSON-encoded and sent as request body
  • headers: Dictionary of additional headers to include
  • params: Query parameters to append to URL
  • stream: Boolean indicating if response should be streamed
  • allow_renew: Boolean to allow automatic token renewal on 401 errors

Returns: asks.response_objects.Response object containing the server response

async base_url(self)

Purpose: Fetches and caches the base URL for API requests from the service manager

Returns: String containing the base URL hostname

async register_device(self, code: str)

Purpose: Registers a new device with the Remarkable Cloud using a one-time code

Parameters:

  • code: One-time registration code obtained from https://my.remarkable.com/connect/remarkable

Returns: True on successful registration, raises AuthError on failure

async prompt_register_device(self)

Purpose: Interactively prompts the user for a registration code and registers the device if not already registered

Returns: Result of register_device() or None if already registered

async renew_token(self)

Purpose: Fetches a new user token for API authentication using the device token

Returns: True on success, raises AuthError on failure

async update_items(self)

Purpose: Fetches the complete list of documents and folders from the cloud and updates the local cache

Returns: None, but updates self.by_id dictionary and refresh_deadline

async get_by_id(self, id_)

Purpose: Retrieves an item (document or folder) by its ID, refreshing cache if needed

Parameters:

  • id_: String ID of the item to retrieve

Returns: items.Item object (Document, Folder, or VirtualFolder)

async get_metadata(self, id_, downloadable=True)

Purpose: Fetches detailed metadata for a specific document from the API

Parameters:

  • id_: String ID of the document
  • downloadable: Boolean indicating whether to include blob URL in response

Returns: Dictionary containing document metadata, raises DocumentNotFound if not found

async get_blob(self, url)

Purpose: Downloads the binary content from a blob URL

Parameters:

  • url: String URL to download from (typically from metadata BlobURLGet)

Returns: Bytes containing the blob content

async get_blob_size(self, url)

Purpose: Gets the size of a blob without downloading it using HEAD request

Parameters:

  • url: String URL to check size for

Returns: Integer size in bytes

async get_file_details(self, url)

Purpose: Determines the file type (PDF, EPUB, or notes) and uncompressed size by examining the blob

Parameters:

  • url: String URL of the blob to examine

Returns: Tuple of (FileType enum, Optional[int] uncompressed_size)

async delete(self, item: items.Item)

Purpose: Deletes a document or folder from the cloud

Parameters:

  • item: items.Item object to delete

Returns: True on success, raises ApiError on failure

async update_metadata(self, item: items.Item)

Purpose: Sends updated metadata for an item to the cloud, incrementing version

Parameters:

  • item: items.Item object with modified metadata

Returns: True on success, raises ApiError on failure

async upload(self, item, contents)

Purpose: Uploads a new version of a document to the cloud

Parameters:

  • item: items.Item object to upload
  • contents: File-like object with read() method containing the document data

Returns: None, raises ApiError on failure

check_response(response: asks.response_objects.Response) static

Purpose: Validates an API response and raises ApiError if the response indicates failure

Parameters:

  • response: asks.response_objects.Response object to validate

Returns: True if response is valid, raises ApiError otherwise

Attributes

Name Type Description Scope
config Config Configuration object storing device token, user token, and timestamps instance
by_id dict[str, items.Item] Dictionary mapping item IDs to Item objects (documents and folders), serves as local cache instance
refresh_deadline Optional[datetime] Timestamp when the local cache should be refreshed, None forces immediate refresh instance
update_lock trio.Lock Lock to ensure thread-safe access to shared state during updates instance
_base_url Optional[str] Cached base URL for API requests, fetched lazily on first use instance

Dependencies

  • asks
  • trio
  • json
  • sys
  • textwrap
  • io
  • logging
  • enum
  • uuid

Required Imports

import asks
import trio
import json
import sys
import textwrap
import io
import logging
from uuid import uuid4
from config import Config
from items import Item, Folder, VirtualFolder
from utils import now
from zipdir import ZipHeader
from exceptions import AuthError, DocumentNotFound, ApiError
from const import RFC3339Nano, USER_AGENT, DEVICE_TOKEN_URL, USER_TOKEN_URL, USER_TOKEN_VALIDITY, DEVICE_REGISTER_URL, DEVICE, NBYTES, FILE_LIST_VALIDITY, ROOT_ID, SERVICE_MGR_URL, TRASH_ID, FileType

Usage Example

import trio
from client import Client

async def main():
    # Instantiate the client
    client = Client()
    
    # Register device (first time only)
    # This will prompt for a code from https://my.remarkable.com/connect/remarkable
    await client.prompt_register_device()
    
    # Renew authentication token
    await client.renew_token()
    
    # Get the root folder and list items
    root = await client.get_by_id(ROOT_ID)
    for item in root.children:
        print(f"{item.name} ({item.id})")
    
    # Get metadata for a specific document
    metadata = await client.get_metadata('some-document-id')
    
    # Download a blob
    blob_url = metadata.get('BlobURLGet')
    if blob_url:
        content = await client.get_blob(blob_url)
    
    # Update metadata for an item
    item = await client.get_by_id('some-id')
    item.name = 'New Name'
    await client.update_metadata(item)
    
    # Delete an item
    await client.delete(item)

trio.run(main)

Best Practices

  • Always call prompt_register_device() or register_device() before first use to obtain device token
  • Token renewal is handled automatically by request() method, but can be called manually with renew_token()
  • Use async/await patterns with trio for all method calls
  • The client maintains an internal cache (by_id) that is refreshed based on FILE_LIST_VALIDITY deadline
  • Use update_lock when accessing shared state to ensure thread safety
  • Call update_items() to refresh the local cache of documents and folders
  • After modifying items (delete, update_metadata, upload), the refresh_deadline is set to None to force cache refresh
  • The client automatically retries requests with 401 status by renewing tokens
  • Use get_by_id() instead of directly accessing by_id dict to ensure cache is up-to-date
  • check_response() should be used to validate API responses and will raise ApiError on failures
  • The client creates virtual folders for root (ROOT_ID) and trash (TRASH_ID) that are always present
  • Device tokens are long-lived, user tokens expire and need renewal (handled automatically)
  • When uploading, the item version is automatically incremented

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class RemarkableAPIClient 80.1% similar

    Asynchronous API client for interacting with the reMarkable Cloud service, providing methods for file management, folder operations, and document synchronization.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_api_endpoints.py
  • class RemarkableCloudManager 73.4% similar

    Unified manager for reMarkable Cloud operations that uses REST API as primary method with rmcl library as fallback, handling authentication, file operations, and folder management.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_cloud.py
  • function test_api_client 70.7% similar

    An async test function stub for testing the RemarkableAPIClient that requires a valid user authentication token to execute API operations.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_api_endpoints.py
  • class RemarkableAuth 70.7% similar

    Handles the complete authentication flow for reMarkable cloud services, managing device tokens, user tokens, and authenticated HTTP sessions.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/auth.py
  • class RemarkableRestClient 68.7% similar

    Direct REST API client for reMarkable Cloud without external dependencies

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_rest_client.py
← Back to Browse