šŸ” Code Extractor

class DocumentToTrashMover

Maturity: 52

A class that moves reMarkable documents to the trash by updating their metadata parent field to 'trash' and synchronizing changes through the reMarkable cloud API.

File:
/tf/active/vicechatdev/e-ink-llm/cloudtest/move_documents_to_trash.py
Lines:
45 - 443
Complexity:
complex

Purpose

This class provides a complete workflow for moving reMarkable documents to trash. It handles authentication, retrieves current document state from the cloud, modifies metadata to set parent='trash', uploads updated schemas, and synchronizes the root document structure. The implementation uses the 'working upload mechanism' that mimics the official reMarkable desktop client's behavior, including proper headers, CRC32C checksums, and endpoint usage.

Source Code

class DocumentToTrashMover:
    """Moves documents TO trash using the working upload mechanism"""
    
    def __init__(self):
        # Load auth session
        auth = RemarkableAuth()
        self.session = auth.get_authenticated_session()
        
        if not self.session:
            raise RuntimeError("Failed to authenticate with reMarkable")
        
        print("šŸ—‘ļø Document to Trash Mover Initialized")
    
    def get_current_root_info(self):
        """Get current root.docSchema info using working method"""
        print("\nšŸ“‹ Step 1: Getting current root.docSchema...")
        
        # Get root info
        root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
        root_response.raise_for_status()
        root_data = root_response.json()
        
        # Get root content
        root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{root_data['hash']}")
        root_content_response.raise_for_status()
        root_content = root_content_response.text
        
        print(f"āœ… Current root hash: {root_data['hash']}")
        print(f"āœ… Current generation: {root_data.get('generation')}")
        print(f"āœ… Root content size: {len(root_content)} bytes")
        
        return root_data, root_content
    
    def get_document_info(self, doc_uuid: str, root_content: str):
        """Find document entry in root.docSchema"""
        print(f"\nšŸ“„ Step 2: Finding document {doc_uuid[:8]}... in root.docSchema")
        
        lines = root_content.strip().split('\n')
        for line in lines[1:]:  # Skip version header
            if doc_uuid in line:
                parts = line.split(':')
                if len(parts) >= 5:
                    doc_info = {
                        'hash': parts[0],
                        'uuid': parts[2],
                        'type': parts[3],
                        'size': parts[4],
                        'full_line': line
                    }
                    print(f"āœ… Found document entry:")
                    print(f"   Hash: {doc_info['hash']}")
                    print(f"   Type: {doc_info['type']}")
                    print(f"   Size: {doc_info['size']}")
                    print(f"   Full line: {doc_info['full_line']}")
                    return doc_info
        
        raise ValueError(f"Document {doc_uuid} not found in root.docSchema")
    
    def get_document_schema(self, doc_hash: str):
        """Retrieve document's docSchema"""
        print(f"\nšŸ“„ Step 3: Retrieving document docSchema...")
        
        doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
        doc_response.raise_for_status()
        doc_content = doc_response.text
        
        print(f"āœ… Document docSchema size: {len(doc_content)} bytes")
        print(f"šŸ“„ Document docSchema content:")
        
        lines = doc_content.strip().split('\n')
        for i, line in enumerate(lines):
            print(f"   Line {i}: {line}")
        
        return doc_content, lines
    
    def get_current_metadata(self, doc_lines: list):
        """Extract and fetch current metadata"""
        print(f"\nšŸ“ Step 4: Getting current metadata...")
        
        metadata_hash = None
        metadata_line = None
        
        # Find metadata component
        for line in doc_lines[1:]:  # Skip version
            if ':' in line and '.metadata' in line:
                parts = line.split(':')
                if len(parts) >= 5:
                    metadata_hash = parts[0]
                    metadata_line = line
                    break
        
        if not metadata_hash:
            raise ValueError("Metadata component not found in document schema")
        
        print(f"āœ… Metadata hash: {metadata_hash}")
        
        # Fetch current metadata
        metadata_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}")
        metadata_response.raise_for_status()
        current_metadata = json.loads(metadata_response.text)
        
        print(f"āœ… Current metadata:")
        for key, value in current_metadata.items():
            print(f"   {key}: {value}")
        
        return current_metadata, metadata_line
    
    def create_trash_metadata(self, current_metadata: dict):
        """Create updated metadata with parent = 'trash'"""
        print(f"\nšŸ—‘ļø Step 5: Creating trash metadata...")
        
        # Copy current metadata and update parent to trash
        updated_metadata = current_metadata.copy()
        old_parent = updated_metadata.get('parent', '')
        updated_metadata['parent'] = 'trash'  # Move to trash
        
        print(f"āœ… Updating parent: '{old_parent}' → 'trash'")
        
        # Keep other fields as they are for trash move
        updated_metadata['lastModified'] = int(time.time() * 1000)
        
        # Convert to JSON
        updated_metadata_json = json.dumps(updated_metadata, separators=(',', ':'))
        
        print(f"āœ… Updated metadata ({len(updated_metadata_json)} bytes):")
        print(f"   {updated_metadata_json[:100]}...")
        
        return updated_metadata_json
    
    def upload_new_metadata(self, metadata_json: str, doc_uuid: str):
        """Upload new metadata using WORKING method"""
        print(f"\nā¬†ļø Step 6: Uploading new metadata using WORKING method...")
        
        # Calculate hash
        metadata_hash = hashlib.sha256(metadata_json.encode()).hexdigest()
        print(f"āœ… New metadata hash: {metadata_hash}")
        
        # Upload using WORKING method from test_move_from_trash.py
        headers = {
            'Content-Type': 'application/octet-stream',
            'rm-batch-number': '1',
            'rm-filename': f'{doc_uuid}.metadata',  # Required: UUID.metadata format
            'rm-sync-id': str(uuid.uuid4()),
            'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',  # Use Windows UA
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'en-BE,*',
            'Connection': 'Keep-Alive'
        }
        
        # Add CRC32C checksum
        crc32c_header = compute_crc32c_header(metadata_json.encode())
        if crc32c_header:
            headers['x-goog-hash'] = crc32c_header
        
        print(f"šŸ“¤ PUT to: https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}")
        print(f"   Headers: {list(headers.keys())}")
        
        upload_response = self.session.put(
            f"https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}",  # WORKING ENDPOINT
            data=metadata_json.encode(),
            headers=headers 
        )
        
        print(f"āœ… Metadata upload response: {upload_response.status_code}")
        if upload_response.status_code not in [200, 202]:
            print(f"āŒ Upload failed: {upload_response.text}")
            raise RuntimeError(f"Metadata upload failed: {upload_response.status_code}")
        
        return metadata_hash
    
    def create_new_document_schema(self, doc_lines: list, new_metadata_hash: str, metadata_line: str):
        """Create new document schema with updated metadata hash"""
        print(f"\nšŸ—ļø Step 7: Creating new document schema...")
        
        # Replace metadata line with new hash
        new_lines = []
        
        for line in doc_lines:
            if line == metadata_line:
                # Replace metadata hash but keep size
                parts = line.split(':')
                parts[0] = new_metadata_hash  # Update hash
                new_line = ':'.join(parts)
                new_lines.append(new_line)
                print(f"āœ… Updated metadata line:")
                print(f"   Old: {line}")
                print(f"   New: {new_line}")
            else:
                new_lines.append(line)
        
        new_doc_content = '\n'.join(new_lines)
        
        print(f"āœ… New document schema ({len(new_doc_content)} bytes):")
        for i, line in enumerate(new_lines):
            print(f"   Line {i}: {line}")
        
        return new_doc_content
    
    def upload_new_document_schema(self, doc_content: str, doc_uuid: str):
        """Upload new document schema using WORKING method"""
        print(f"\nā¬†ļø Step 8: Uploading new document schema...")
        
        # Calculate hash
        doc_hash = hashlib.sha256(doc_content.encode()).hexdigest()
        print(f"āœ… New document schema hash: {doc_hash}")
        
        # Upload using WORKING method
        headers = {
            'Content-Type': 'application/octet-stream',
            'rm-batch-number': '1',
            'rm-filename': f'{doc_uuid}.docSchema',  # Required: UUID.docSchema format
            'rm-sync-id': str(uuid.uuid4()),
            'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'en-BE,*',
            'Connection': 'Keep-Alive'
        }
        
        # Add CRC32C checksum
        crc32c_header = compute_crc32c_header(doc_content.encode())
        if crc32c_header:
            headers['x-goog-hash'] = crc32c_header
        
        print(f"šŸ“¤ PUT to: https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
        
        upload_response = self.session.put(
            f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}",  # WORKING ENDPOINT
            data=doc_content.encode(),
            headers=headers
        )
        
        print(f"āœ… Document schema upload response: {upload_response.status_code}")
        if upload_response.status_code not in [200, 202]:
            print(f"āŒ Upload failed: {upload_response.text}")
            raise RuntimeError(f"Document schema upload failed: {upload_response.status_code}")
        
        return doc_hash
    
    def update_root_docschema(self, root_content: str, doc_info: dict, new_doc_hash: str):
        """Update root.docSchema with new document hash"""
        print(f"\nšŸ”„ Step 9: Updating root.docSchema...")
        
        # Replace old document line with new hash
        old_line = doc_info['full_line']
        parts = old_line.split(':')
        parts[0] = new_doc_hash  # Update document hash
        new_line = ':'.join(parts)
        
        print(f"āœ… Updating root.docSchema entry:")
        print(f"   Old: {old_line}")
        print(f"   New: {new_line}")
        
        # Replace in root content
        new_root_content = root_content.replace(old_line, new_line)
        
        print(f"āœ… New root.docSchema size: {len(new_root_content)} bytes")
        
        return new_root_content
    
    def upload_new_root(self, root_content: str, generation: int):
        """Upload new root.docSchema and update roothash using WORKING method"""
        print(f"\nā¬†ļø Step 10: Uploading new root.docSchema...")
        
        # Calculate hash
        root_hash = hashlib.sha256(root_content.encode()).hexdigest()
        print(f"āœ… New root hash: {root_hash}")
        
        # Upload root content using WORKING method
        headers = {
            'Content-Type': 'text/plain',
            'rm-batch-number': '1',
            'rm-filename': 'root.docSchema',  # System filename for root.docSchema
            'rm-sync-id': str(uuid.uuid4()),
            'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'en-BE,*',
            'Connection': 'Keep-Alive'
        }
        
        # Add CRC32C checksum
        crc32c_header = compute_crc32c_header(root_content.encode())
        if crc32c_header:
            headers['x-goog-hash'] = crc32c_header
        
        print(f"šŸ“¤ PUT to: https://eu.tectonic.remarkable.com/sync/v3/files/{root_hash}")
        
        upload_response = self.session.put(
            f"https://eu.tectonic.remarkable.com/sync/v3/files/{root_hash}",  # WORKING ENDPOINT
            data=root_content.encode(),
            headers=headers
        )
        
        print(f"āœ… Root content upload response: {upload_response.status_code}")
        if upload_response.status_code not in [200, 202]:
            print(f"āŒ Upload failed: {upload_response.text}")
            raise RuntimeError(f"Root content upload failed: {upload_response.status_code}")
        
        # Update root hash pointer using WORKING method
        print(f"\nšŸ”„ Step 11: Updating root hash pointer...")
        
        # Create root data exactly like working upload_manager.py
        root_update_data = {
            "broadcast": True,
            "generation": generation,  # Use generation parameter
            "hash": root_hash
        }
        
        # Convert to JSON with 2-space indent like real app
        root_content_body = json.dumps(root_update_data, indent=2).encode('utf-8')
        
        # Headers exactly like working upload_manager.py
        headers = {
            'Content-Type': 'application/json',
            'rm-batch-number': '1',
            'rm-filename': 'roothash',
            'rm-sync-id': str(uuid.uuid4()),
            'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
            'Accept-Encoding': 'gzip, deflate',
            'Accept-Language': 'en-BE,*',
            'Connection': 'Keep-Alive'
        }
        
        # Add CRC32C checksum
        crc32c_header = compute_crc32c_header(root_content_body)
        if crc32c_header:
            headers['x-goog-hash'] = crc32c_header
        
        # Use /sync/v3/root endpoint like working code
        print(f"šŸ“¤ PUT to: https://eu.tectonic.remarkable.com/sync/v3/root")
        
        root_update_response = self.session.put(
            "https://eu.tectonic.remarkable.com/sync/v3/root",  # WORKING ENDPOINT
            data=root_content_body,
            headers=headers
        )
        
        print(f"āœ… Root update response: {root_update_response.status_code}")
        if root_update_response.status_code not in [200, 202]:
            print(f"āŒ Root update failed: {root_update_response.text}")
            raise RuntimeError(f"Root update failed: {root_update_response.status_code}")
        
        return root_hash
    
    def move_document_to_trash(self, doc_uuid: str):
        """Complete process to move document TO trash"""
        print(f"šŸ—‘ļø Moving Document TO Trash")
        print(f"Document UUID: {doc_uuid}")
        print("=" * 60)
        
        try:
            # Step 1: Get current root info
            root_data, root_content = self.get_current_root_info()
            
            # Step 2: Find document in root
            doc_info = self.get_document_info(doc_uuid, root_content)
            
            # Step 3: Get document schema
            doc_content, doc_lines = self.get_document_schema(doc_info['hash'])
            
            # Step 4: Get current metadata
            current_metadata, metadata_line = self.get_current_metadata(doc_lines)
            
            # Check current parent
            current_parent = current_metadata.get('parent', '')
            if current_parent == 'trash':
                print(f"āš ļø Document is already in trash!")
                return True
            
            print(f"šŸ“ Moving document from '{current_parent or '(root)'}' to trash...")
            
            # Step 5: Create trash metadata (set parent = 'trash')
            updated_metadata_json = self.create_trash_metadata(current_metadata)
            
            # Step 6: Upload new metadata using WORKING method
            new_metadata_hash = self.upload_new_metadata(updated_metadata_json, doc_uuid)
            
            # Step 7: Create new document schema
            new_doc_content = self.create_new_document_schema(doc_lines, new_metadata_hash, metadata_line)
            
            # Step 8: Upload new document schema using WORKING method
            new_doc_hash = self.upload_new_document_schema(new_doc_content, doc_uuid)
            
            # Step 9: Update root.docSchema
            new_root_content = self.update_root_docschema(root_content, doc_info, new_doc_hash)
            
            # Step 10-11: Upload new root and update pointer using WORKING method
            new_root_hash = self.upload_new_root(new_root_content, root_data['generation'])
            
            print(f"\nšŸŽ‰ SUCCESS! Document moved to trash")
            print(f"   Document: {current_metadata.get('visibleName')}")
            print(f"   Old parent: {current_parent or '(root)'}")
            print(f"   New parent: trash")
            print(f"   New root hash: {new_root_hash}")
            
            return True
            
        except Exception as e:
            print(f"\nāŒ Move to trash operation failed: {e}")
            return False

Parameters

Name Type Default Kind
bases - -

Parameter Details

__init__: No parameters required. The constructor automatically initializes authentication using RemarkableAuth and establishes an authenticated session with the reMarkable cloud service. Raises RuntimeError if authentication fails.

Return Value

Instantiation returns a DocumentToTrashMover object with an authenticated session. The main method move_document_to_trash() returns a boolean: True if the document was successfully moved to trash, False if the operation failed. Other methods return various data structures: tuples of (dict, str) for root info, dict for document info, str/list for schemas, and str for hash values.

Class Interface

Methods

__init__(self)

Purpose: Initialize the DocumentToTrashMover with authenticated session

Returns: None - initializes self.session attribute

get_current_root_info(self) -> tuple[dict, str]

Purpose: Retrieve current root.docSchema information including hash, generation, and content

Returns: Tuple of (root_data dict with hash/generation, root_content string)

get_document_info(self, doc_uuid: str, root_content: str) -> dict

Purpose: Find and extract document entry from root.docSchema by UUID

Parameters:

  • doc_uuid: UUID of the document to find
  • root_content: Content of root.docSchema as string

Returns: Dict with keys: hash, uuid, type, size, full_line

get_document_schema(self, doc_hash: str) -> tuple[str, list]

Purpose: Retrieve and parse a document's docSchema file from cloud storage

Parameters:

  • doc_hash: SHA256 hash of the document schema to retrieve

Returns: Tuple of (doc_content string, doc_lines list)

get_current_metadata(self, doc_lines: list) -> tuple[dict, str]

Purpose: Extract and fetch current metadata from document schema lines

Parameters:

  • doc_lines: List of lines from document schema

Returns: Tuple of (current_metadata dict, metadata_line string)

create_trash_metadata(self, current_metadata: dict) -> str

Purpose: Create updated metadata JSON with parent field set to 'trash'

Parameters:

  • current_metadata: Current metadata dictionary to update

Returns: JSON string of updated metadata with parent='trash'

upload_new_metadata(self, metadata_json: str, doc_uuid: str) -> str

Purpose: Upload new metadata file to cloud storage using working upload method

Parameters:

  • metadata_json: JSON string of metadata to upload
  • doc_uuid: UUID of the document (used in rm-filename header)

Returns: SHA256 hash of the uploaded metadata

create_new_document_schema(self, doc_lines: list, new_metadata_hash: str, metadata_line: str) -> str

Purpose: Create new document schema with updated metadata hash

Parameters:

  • doc_lines: List of lines from original document schema
  • new_metadata_hash: New hash to replace in metadata line
  • metadata_line: Original metadata line to replace

Returns: New document schema content as string

upload_new_document_schema(self, doc_content: str, doc_uuid: str) -> str

Purpose: Upload new document schema to cloud storage using working upload method

Parameters:

  • doc_content: Document schema content to upload
  • doc_uuid: UUID of the document (used in rm-filename header)

Returns: SHA256 hash of the uploaded document schema

update_root_docschema(self, root_content: str, doc_info: dict, new_doc_hash: str) -> str

Purpose: Update root.docSchema content with new document hash

Parameters:

  • root_content: Current root.docSchema content
  • doc_info: Document info dict with full_line key
  • new_doc_hash: New document hash to replace in root

Returns: Updated root.docSchema content as string

upload_new_root(self, root_content: str, generation: int) -> str

Purpose: Upload new root.docSchema and update root hash pointer using working method

Parameters:

  • root_content: New root.docSchema content to upload
  • generation: Generation number for root update

Returns: SHA256 hash of the new root

move_document_to_trash(self, doc_uuid: str) -> bool

Purpose: Complete workflow to move a document to trash (main entry point)

Parameters:

  • doc_uuid: UUID of the document to move to trash

Returns: True if successful, False if operation failed

Attributes

Name Type Description Scope
session requests.Session Authenticated HTTP session for making API requests to reMarkable cloud instance

Dependencies

  • json
  • time
  • hashlib
  • uuid
  • base64
  • zlib
  • pathlib
  • crc32c
  • sys
  • requests

Required Imports

import json
import time
import hashlib
import uuid
import base64
import zlib
from pathlib import Path
from auth import RemarkableAuth
import crc32c
import sys

Usage Example

# Initialize the mover
mover = DocumentToTrashMover()

# Move a document to trash by UUID
doc_uuid = 'abc123-def456-ghi789'
success = mover.move_document_to_trash(doc_uuid)

if success:
    print('Document moved to trash successfully')
else:
    print('Failed to move document to trash')

# Advanced: Get current root info
root_data, root_content = mover.get_current_root_info()
print(f'Current root hash: {root_data["hash"]}')

# Advanced: Find document info
doc_info = mover.get_document_info(doc_uuid, root_content)
print(f'Document type: {doc_info["type"]}')

Best Practices

  • Always instantiate the class before calling any methods - authentication happens in __init__
  • Handle RuntimeError during instantiation if authentication fails
  • The move_document_to_trash() method is the main entry point - other methods are internal steps
  • Methods are designed to be called in sequence (steps 1-11) for the complete workflow
  • Each method prints detailed progress information for debugging and monitoring
  • The class maintains stateless operation except for the authenticated session
  • Check if document is already in trash before attempting move (handled automatically)
  • All uploads use the 'working method' that mimics official reMarkable client behavior
  • CRC32C checksums are required for data integrity verification
  • Generation numbers must be preserved when updating root to prevent conflicts
  • The session object is reused across all API calls for efficiency
  • Methods raise ValueError if documents or metadata are not found
  • Methods raise RuntimeError if uploads fail with non-200/202 status codes

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function move_document_to_trash 90.9% similar

    Moves a reMarkable document to trash by updating its metadata parent field to 'trash', then propagating the changes through the document schema hierarchy and updating the root hash.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/apply_working_trash_move.py
  • function move_documents_to_trash 89.8% similar

    Moves specified reMarkable Cloud documents to trash by updating their metadata parent field to 'trash' and propagating changes through the document hierarchy.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/move_remaining_to_trash_fixed.py
  • function apply_working_trash_move 83.6% similar

    Moves a hardcoded list of reMarkable cloud documents to trash by authenticating with the reMarkable API and applying trash operations to each document.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/apply_working_trash_move.py
  • function simple_move_to_trash 82.9% similar

    Moves all documents from the reMarkable tablet's root directory to trash by uploading an empty root.docSchema file and updating the roothash.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/simple_clean_root.py
  • function main_v45 76.5% similar

    Command-line interface function that moves a single reMarkable document to trash by accepting a document UUID as a command-line argument.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/move_documents_to_trash.py
← Back to Browse