class DocumentToTrashMover
A class that moves reMarkable documents to the trash by updating their metadata parent field to 'trash' and synchronizing changes through the reMarkable cloud API.
/tf/active/vicechatdev/e-ink-llm/cloudtest/move_documents_to_trash.py
45 - 443
complex
Purpose
This class provides a complete workflow for moving reMarkable documents to trash. It handles authentication, retrieves current document state from the cloud, modifies metadata to set parent='trash', uploads updated schemas, and synchronizes the root document structure. The implementation uses the 'working upload mechanism' that mimics the official reMarkable desktop client's behavior, including proper headers, CRC32C checksums, and endpoint usage.
Source Code
class DocumentToTrashMover:
"""Moves documents TO trash using the working upload mechanism"""
def __init__(self):
# Load auth session
auth = RemarkableAuth()
self.session = auth.get_authenticated_session()
if not self.session:
raise RuntimeError("Failed to authenticate with reMarkable")
print("šļø Document to Trash Mover Initialized")
def get_current_root_info(self):
"""Get current root.docSchema info using working method"""
print("\nš Step 1: Getting current root.docSchema...")
# Get root info
root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
root_response.raise_for_status()
root_data = root_response.json()
# Get root content
root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{root_data['hash']}")
root_content_response.raise_for_status()
root_content = root_content_response.text
print(f"ā
Current root hash: {root_data['hash']}")
print(f"ā
Current generation: {root_data.get('generation')}")
print(f"ā
Root content size: {len(root_content)} bytes")
return root_data, root_content
def get_document_info(self, doc_uuid: str, root_content: str):
"""Find document entry in root.docSchema"""
print(f"\nš Step 2: Finding document {doc_uuid[:8]}... in root.docSchema")
lines = root_content.strip().split('\n')
for line in lines[1:]: # Skip version header
if doc_uuid in line:
parts = line.split(':')
if len(parts) >= 5:
doc_info = {
'hash': parts[0],
'uuid': parts[2],
'type': parts[3],
'size': parts[4],
'full_line': line
}
print(f"ā
Found document entry:")
print(f" Hash: {doc_info['hash']}")
print(f" Type: {doc_info['type']}")
print(f" Size: {doc_info['size']}")
print(f" Full line: {doc_info['full_line']}")
return doc_info
raise ValueError(f"Document {doc_uuid} not found in root.docSchema")
def get_document_schema(self, doc_hash: str):
"""Retrieve document's docSchema"""
print(f"\nš Step 3: Retrieving document docSchema...")
doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
doc_response.raise_for_status()
doc_content = doc_response.text
print(f"ā
Document docSchema size: {len(doc_content)} bytes")
print(f"š Document docSchema content:")
lines = doc_content.strip().split('\n')
for i, line in enumerate(lines):
print(f" Line {i}: {line}")
return doc_content, lines
def get_current_metadata(self, doc_lines: list):
"""Extract and fetch current metadata"""
print(f"\nš Step 4: Getting current metadata...")
metadata_hash = None
metadata_line = None
# Find metadata component
for line in doc_lines[1:]: # Skip version
if ':' in line and '.metadata' in line:
parts = line.split(':')
if len(parts) >= 5:
metadata_hash = parts[0]
metadata_line = line
break
if not metadata_hash:
raise ValueError("Metadata component not found in document schema")
print(f"ā
Metadata hash: {metadata_hash}")
# Fetch current metadata
metadata_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}")
metadata_response.raise_for_status()
current_metadata = json.loads(metadata_response.text)
print(f"ā
Current metadata:")
for key, value in current_metadata.items():
print(f" {key}: {value}")
return current_metadata, metadata_line
def create_trash_metadata(self, current_metadata: dict):
"""Create updated metadata with parent = 'trash'"""
print(f"\nšļø Step 5: Creating trash metadata...")
# Copy current metadata and update parent to trash
updated_metadata = current_metadata.copy()
old_parent = updated_metadata.get('parent', '')
updated_metadata['parent'] = 'trash' # Move to trash
print(f"ā
Updating parent: '{old_parent}' ā 'trash'")
# Keep other fields as they are for trash move
updated_metadata['lastModified'] = int(time.time() * 1000)
# Convert to JSON
updated_metadata_json = json.dumps(updated_metadata, separators=(',', ':'))
print(f"ā
Updated metadata ({len(updated_metadata_json)} bytes):")
print(f" {updated_metadata_json[:100]}...")
return updated_metadata_json
def upload_new_metadata(self, metadata_json: str, doc_uuid: str):
"""Upload new metadata using WORKING method"""
print(f"\nā¬ļø Step 6: Uploading new metadata using WORKING method...")
# Calculate hash
metadata_hash = hashlib.sha256(metadata_json.encode()).hexdigest()
print(f"ā
New metadata hash: {metadata_hash}")
# Upload using WORKING method from test_move_from_trash.py
headers = {
'Content-Type': 'application/octet-stream',
'rm-batch-number': '1',
'rm-filename': f'{doc_uuid}.metadata', # Required: UUID.metadata format
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951', # Use Windows UA
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(metadata_json.encode())
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
print(f"š¤ PUT to: https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}")
print(f" Headers: {list(headers.keys())}")
upload_response = self.session.put(
f"https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}", # WORKING ENDPOINT
data=metadata_json.encode(),
headers=headers
)
print(f"ā
Metadata upload response: {upload_response.status_code}")
if upload_response.status_code not in [200, 202]:
print(f"ā Upload failed: {upload_response.text}")
raise RuntimeError(f"Metadata upload failed: {upload_response.status_code}")
return metadata_hash
def create_new_document_schema(self, doc_lines: list, new_metadata_hash: str, metadata_line: str):
"""Create new document schema with updated metadata hash"""
print(f"\nšļø Step 7: Creating new document schema...")
# Replace metadata line with new hash
new_lines = []
for line in doc_lines:
if line == metadata_line:
# Replace metadata hash but keep size
parts = line.split(':')
parts[0] = new_metadata_hash # Update hash
new_line = ':'.join(parts)
new_lines.append(new_line)
print(f"ā
Updated metadata line:")
print(f" Old: {line}")
print(f" New: {new_line}")
else:
new_lines.append(line)
new_doc_content = '\n'.join(new_lines)
print(f"ā
New document schema ({len(new_doc_content)} bytes):")
for i, line in enumerate(new_lines):
print(f" Line {i}: {line}")
return new_doc_content
def upload_new_document_schema(self, doc_content: str, doc_uuid: str):
"""Upload new document schema using WORKING method"""
print(f"\nā¬ļø Step 8: Uploading new document schema...")
# Calculate hash
doc_hash = hashlib.sha256(doc_content.encode()).hexdigest()
print(f"ā
New document schema hash: {doc_hash}")
# Upload using WORKING method
headers = {
'Content-Type': 'application/octet-stream',
'rm-batch-number': '1',
'rm-filename': f'{doc_uuid}.docSchema', # Required: UUID.docSchema format
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(doc_content.encode())
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
print(f"š¤ PUT to: https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
upload_response = self.session.put(
f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}", # WORKING ENDPOINT
data=doc_content.encode(),
headers=headers
)
print(f"ā
Document schema upload response: {upload_response.status_code}")
if upload_response.status_code not in [200, 202]:
print(f"ā Upload failed: {upload_response.text}")
raise RuntimeError(f"Document schema upload failed: {upload_response.status_code}")
return doc_hash
def update_root_docschema(self, root_content: str, doc_info: dict, new_doc_hash: str):
"""Update root.docSchema with new document hash"""
print(f"\nš Step 9: Updating root.docSchema...")
# Replace old document line with new hash
old_line = doc_info['full_line']
parts = old_line.split(':')
parts[0] = new_doc_hash # Update document hash
new_line = ':'.join(parts)
print(f"ā
Updating root.docSchema entry:")
print(f" Old: {old_line}")
print(f" New: {new_line}")
# Replace in root content
new_root_content = root_content.replace(old_line, new_line)
print(f"ā
New root.docSchema size: {len(new_root_content)} bytes")
return new_root_content
def upload_new_root(self, root_content: str, generation: int):
"""Upload new root.docSchema and update roothash using WORKING method"""
print(f"\nā¬ļø Step 10: Uploading new root.docSchema...")
# Calculate hash
root_hash = hashlib.sha256(root_content.encode()).hexdigest()
print(f"ā
New root hash: {root_hash}")
# Upload root content using WORKING method
headers = {
'Content-Type': 'text/plain',
'rm-batch-number': '1',
'rm-filename': 'root.docSchema', # System filename for root.docSchema
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(root_content.encode())
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
print(f"š¤ PUT to: https://eu.tectonic.remarkable.com/sync/v3/files/{root_hash}")
upload_response = self.session.put(
f"https://eu.tectonic.remarkable.com/sync/v3/files/{root_hash}", # WORKING ENDPOINT
data=root_content.encode(),
headers=headers
)
print(f"ā
Root content upload response: {upload_response.status_code}")
if upload_response.status_code not in [200, 202]:
print(f"ā Upload failed: {upload_response.text}")
raise RuntimeError(f"Root content upload failed: {upload_response.status_code}")
# Update root hash pointer using WORKING method
print(f"\nš Step 11: Updating root hash pointer...")
# Create root data exactly like working upload_manager.py
root_update_data = {
"broadcast": True,
"generation": generation, # Use generation parameter
"hash": root_hash
}
# Convert to JSON with 2-space indent like real app
root_content_body = json.dumps(root_update_data, indent=2).encode('utf-8')
# Headers exactly like working upload_manager.py
headers = {
'Content-Type': 'application/json',
'rm-batch-number': '1',
'rm-filename': 'roothash',
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'reMarkable-desktop-win/3.11.1.1951',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add CRC32C checksum
crc32c_header = compute_crc32c_header(root_content_body)
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
# Use /sync/v3/root endpoint like working code
print(f"š¤ PUT to: https://eu.tectonic.remarkable.com/sync/v3/root")
root_update_response = self.session.put(
"https://eu.tectonic.remarkable.com/sync/v3/root", # WORKING ENDPOINT
data=root_content_body,
headers=headers
)
print(f"ā
Root update response: {root_update_response.status_code}")
if root_update_response.status_code not in [200, 202]:
print(f"ā Root update failed: {root_update_response.text}")
raise RuntimeError(f"Root update failed: {root_update_response.status_code}")
return root_hash
def move_document_to_trash(self, doc_uuid: str):
"""Complete process to move document TO trash"""
print(f"šļø Moving Document TO Trash")
print(f"Document UUID: {doc_uuid}")
print("=" * 60)
try:
# Step 1: Get current root info
root_data, root_content = self.get_current_root_info()
# Step 2: Find document in root
doc_info = self.get_document_info(doc_uuid, root_content)
# Step 3: Get document schema
doc_content, doc_lines = self.get_document_schema(doc_info['hash'])
# Step 4: Get current metadata
current_metadata, metadata_line = self.get_current_metadata(doc_lines)
# Check current parent
current_parent = current_metadata.get('parent', '')
if current_parent == 'trash':
print(f"ā ļø Document is already in trash!")
return True
print(f"š Moving document from '{current_parent or '(root)'}' to trash...")
# Step 5: Create trash metadata (set parent = 'trash')
updated_metadata_json = self.create_trash_metadata(current_metadata)
# Step 6: Upload new metadata using WORKING method
new_metadata_hash = self.upload_new_metadata(updated_metadata_json, doc_uuid)
# Step 7: Create new document schema
new_doc_content = self.create_new_document_schema(doc_lines, new_metadata_hash, metadata_line)
# Step 8: Upload new document schema using WORKING method
new_doc_hash = self.upload_new_document_schema(new_doc_content, doc_uuid)
# Step 9: Update root.docSchema
new_root_content = self.update_root_docschema(root_content, doc_info, new_doc_hash)
# Step 10-11: Upload new root and update pointer using WORKING method
new_root_hash = self.upload_new_root(new_root_content, root_data['generation'])
print(f"\nš SUCCESS! Document moved to trash")
print(f" Document: {current_metadata.get('visibleName')}")
print(f" Old parent: {current_parent or '(root)'}")
print(f" New parent: trash")
print(f" New root hash: {new_root_hash}")
return True
except Exception as e:
print(f"\nā Move to trash operation failed: {e}")
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
__init__: No parameters required. The constructor automatically initializes authentication using RemarkableAuth and establishes an authenticated session with the reMarkable cloud service. Raises RuntimeError if authentication fails.
Return Value
Instantiation returns a DocumentToTrashMover object with an authenticated session. The main method move_document_to_trash() returns a boolean: True if the document was successfully moved to trash, False if the operation failed. Other methods return various data structures: tuples of (dict, str) for root info, dict for document info, str/list for schemas, and str for hash values.
Class Interface
Methods
__init__(self)
Purpose: Initialize the DocumentToTrashMover with authenticated session
Returns: None - initializes self.session attribute
get_current_root_info(self) -> tuple[dict, str]
Purpose: Retrieve current root.docSchema information including hash, generation, and content
Returns: Tuple of (root_data dict with hash/generation, root_content string)
get_document_info(self, doc_uuid: str, root_content: str) -> dict
Purpose: Find and extract document entry from root.docSchema by UUID
Parameters:
doc_uuid: UUID of the document to findroot_content: Content of root.docSchema as string
Returns: Dict with keys: hash, uuid, type, size, full_line
get_document_schema(self, doc_hash: str) -> tuple[str, list]
Purpose: Retrieve and parse a document's docSchema file from cloud storage
Parameters:
doc_hash: SHA256 hash of the document schema to retrieve
Returns: Tuple of (doc_content string, doc_lines list)
get_current_metadata(self, doc_lines: list) -> tuple[dict, str]
Purpose: Extract and fetch current metadata from document schema lines
Parameters:
doc_lines: List of lines from document schema
Returns: Tuple of (current_metadata dict, metadata_line string)
create_trash_metadata(self, current_metadata: dict) -> str
Purpose: Create updated metadata JSON with parent field set to 'trash'
Parameters:
current_metadata: Current metadata dictionary to update
Returns: JSON string of updated metadata with parent='trash'
upload_new_metadata(self, metadata_json: str, doc_uuid: str) -> str
Purpose: Upload new metadata file to cloud storage using working upload method
Parameters:
metadata_json: JSON string of metadata to uploaddoc_uuid: UUID of the document (used in rm-filename header)
Returns: SHA256 hash of the uploaded metadata
create_new_document_schema(self, doc_lines: list, new_metadata_hash: str, metadata_line: str) -> str
Purpose: Create new document schema with updated metadata hash
Parameters:
doc_lines: List of lines from original document schemanew_metadata_hash: New hash to replace in metadata linemetadata_line: Original metadata line to replace
Returns: New document schema content as string
upload_new_document_schema(self, doc_content: str, doc_uuid: str) -> str
Purpose: Upload new document schema to cloud storage using working upload method
Parameters:
doc_content: Document schema content to uploaddoc_uuid: UUID of the document (used in rm-filename header)
Returns: SHA256 hash of the uploaded document schema
update_root_docschema(self, root_content: str, doc_info: dict, new_doc_hash: str) -> str
Purpose: Update root.docSchema content with new document hash
Parameters:
root_content: Current root.docSchema contentdoc_info: Document info dict with full_line keynew_doc_hash: New document hash to replace in root
Returns: Updated root.docSchema content as string
upload_new_root(self, root_content: str, generation: int) -> str
Purpose: Upload new root.docSchema and update root hash pointer using working method
Parameters:
root_content: New root.docSchema content to uploadgeneration: Generation number for root update
Returns: SHA256 hash of the new root
move_document_to_trash(self, doc_uuid: str) -> bool
Purpose: Complete workflow to move a document to trash (main entry point)
Parameters:
doc_uuid: UUID of the document to move to trash
Returns: True if successful, False if operation failed
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
session |
requests.Session | Authenticated HTTP session for making API requests to reMarkable cloud | instance |
Dependencies
jsontimehashlibuuidbase64zlibpathlibcrc32csysrequests
Required Imports
import json
import time
import hashlib
import uuid
import base64
import zlib
from pathlib import Path
from auth import RemarkableAuth
import crc32c
import sys
Usage Example
# Initialize the mover
mover = DocumentToTrashMover()
# Move a document to trash by UUID
doc_uuid = 'abc123-def456-ghi789'
success = mover.move_document_to_trash(doc_uuid)
if success:
print('Document moved to trash successfully')
else:
print('Failed to move document to trash')
# Advanced: Get current root info
root_data, root_content = mover.get_current_root_info()
print(f'Current root hash: {root_data["hash"]}')
# Advanced: Find document info
doc_info = mover.get_document_info(doc_uuid, root_content)
print(f'Document type: {doc_info["type"]}')
Best Practices
- Always instantiate the class before calling any methods - authentication happens in __init__
- Handle RuntimeError during instantiation if authentication fails
- The move_document_to_trash() method is the main entry point - other methods are internal steps
- Methods are designed to be called in sequence (steps 1-11) for the complete workflow
- Each method prints detailed progress information for debugging and monitoring
- The class maintains stateless operation except for the authenticated session
- Check if document is already in trash before attempting move (handled automatically)
- All uploads use the 'working method' that mimics official reMarkable client behavior
- CRC32C checksums are required for data integrity verification
- Generation numbers must be preserved when updating root to prevent conflicts
- The session object is reused across all API calls for efficiency
- Methods raise ValueError if documents or metadata are not found
- Methods raise RuntimeError if uploads fail with non-200/202 status codes
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function move_document_to_trash 90.9% similar
-
function move_documents_to_trash 89.8% similar
-
function apply_working_trash_move 83.6% similar
-
function simple_move_to_trash 82.9% similar
-
function main_v45 76.5% similar