class RemarkableUploadManager
Manages uploads to reMarkable cloud
/tf/active/vicechatdev/e-ink-llm/cloudtest/upload_manager.py
32 - 1125
moderate
Purpose
Manages uploads to reMarkable cloud
Source Code
class RemarkableUploadManager:
"""Manages uploads to reMarkable cloud"""
def __init__(self, session: requests.Session, replica_database_path: str):
self.session = session
self.base_url = "https://eu.tectonic.remarkable.com"
# Load replica database
self.database_path = Path(replica_database_path)
self.database = self._load_database()
# Track uploads
self.upload_queue: List[Dict[str, Any]] = []
self.uploaded_hashes: Dict[str, str] = {} # hash -> upload_status
self._current_document_uuid: Optional[str] = None # UUID for consistent rm-filename headers
self._server_generation: Optional[int] = None # Store generation from server for final root update
def _clear_document_context(self):
"""Clear the current document UUID context for new uploads"""
self._current_document_uuid = None
def _load_database(self) -> Dict[str, Any]:
"""Load the replica database"""
if not self.database_path.exists():
raise FileNotFoundError(f"Database not found: {self.database_path}")
with open(self.database_path, 'r', encoding='utf-8') as f:
return json.load(f)
def _save_database(self):
"""Save the updated database"""
with open(self.database_path, 'w', encoding='utf-8') as f:
json.dump(self.database, f, indent=2, ensure_ascii=False)
def _compute_hash(self, content: bytes) -> str:
"""Compute SHA256 hash of content"""
return hashlib.sha256(content).hexdigest()
def _compute_crc32c_header(self, content: bytes) -> str:
"""Compute CRC32C checksum and return as x-goog-hash header value"""
try:
# Use proper crc32c library if available
if HAS_CRC32C:
checksum = crc32c.crc32c(content)
else:
# Fallback to standard CRC32 (not ideal but better than nothing)
checksum = zlib.crc32(content) & 0xffffffff
# Convert to bytes and base64 encode
checksum_bytes = checksum.to_bytes(4, byteorder='big')
checksum_b64 = base64.b64encode(checksum_bytes).decode('ascii')
return f"crc32c={checksum_b64}"
except Exception as e:
print(f"⚠️ Warning: Failed to compute CRC32C checksum: {e}")
# Return empty string to skip the header if computation fails
return ""
def _generate_timestamp(self) -> str:
"""Generate reMarkable timestamp"""
return str(int(time.time() * 1000))
def _generate_generation(self) -> int:
"""Generate reMarkable generation number"""
return int(time.time() * 1000000)
def _capture_server_generation(self) -> bool:
"""Capture the current server generation for use in final root update"""
try:
print(f"📡 Capturing server generation for upload sequence...")
root_url = f"{self.base_url}/sync/v4/root"
root_response = self.session.get(root_url)
root_response.raise_for_status()
current_root = root_response.json()
self._server_generation = current_root.get('generation')
print(f"🔍 Captured server generation: {self._server_generation}")
return True
except Exception as e:
print(f"❌ Failed to capture server generation: {e}")
self._server_generation = None
return False
def upload_raw_content(self, content: bytes, content_hash: str = None, filename: str = None,
content_type: str = "application/octet-stream", system_filename: str = None) -> Optional[str]:
"""Upload raw content and return its hash"""
if content_hash is None:
content_hash = self._compute_hash(content)
# Check if already uploaded
if content_hash in self.uploaded_hashes:
print(f"✅ Content already uploaded: {content_hash[:16]}...")
return content_hash
try:
url = f"{self.base_url}/sync/v3/files/{content_hash}"
# Prepare headers like the reMarkable app
headers = {
'Content-Type': content_type,
'rm-batch-number': '1',
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'desktop/3.20.0.922 (macos 15.4)', # ✅ FIXED: Match real app
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive'
}
# Add rm-filename header - REQUIRED for all PUT requests
# Handle different patterns: UUID-based files vs system files
if system_filename:
# System files like "roothash", "root.docSchema" (no UUID)
rm_filename = system_filename
print(f"🏷️ rm-filename (system): {rm_filename}")
elif filename:
# Document files with UUID pattern
if hasattr(self, '_current_document_uuid') and self._current_document_uuid:
doc_uuid = self._current_document_uuid
else:
# Generate and store new UUID for this document
doc_uuid = str(uuid.uuid4())
self._current_document_uuid = doc_uuid
print(f"📊 Generated new document UUID: {doc_uuid}")
# Use the filename as provided or construct UUID.extension format
if '.' in filename and len(filename.split('.')[0]) == 36: # Already UUID.extension
rm_filename = filename
else:
# Determine extension and construct UUID.extension
if content_type == 'application/pdf' or filename.lower().endswith('.pdf'):
rm_filename = f"{doc_uuid}.pdf"
elif 'metadata' in filename.lower():
rm_filename = f"{doc_uuid}.metadata"
elif filename.lower().endswith('.content'):
rm_filename = f"{doc_uuid}.content"
elif filename.lower().endswith('.rm'):
# Page data keeps original filename for .rm files
rm_filename = filename
elif filename.lower().endswith('.docschema') or 'docschema' in filename.lower():
rm_filename = f"{doc_uuid}.docSchema"
elif filename.lower().endswith('.pagedata'):
rm_filename = f"{doc_uuid}.pagedata"
else:
# Default construction
rm_filename = f"{doc_uuid}.{filename}"
print(f"🏷️ rm-filename (document): {rm_filename}")
else:
# Fallback - generate basic filename
if hasattr(self, '_current_document_uuid') and self._current_document_uuid:
doc_uuid = self._current_document_uuid
else:
doc_uuid = str(uuid.uuid4())
self._current_document_uuid = doc_uuid
if content_type == 'application/pdf':
rm_filename = f"{doc_uuid}.pdf"
elif content_type == 'application/octet-stream':
rm_filename = f"{doc_uuid}.metadata"
else:
rm_filename = f"{doc_uuid}.content"
print(f"🏷️ rm-filename (fallback): {rm_filename}")
headers['rm-filename'] = rm_filename
# Add CRC32C checksum (this is the missing piece!)
crc32c_header = self._compute_crc32c_header(content)
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
print(f"🔍 Debug: Upload headers for {content_hash[:16]}...")
for key, value in headers.items():
print(f" {key}: {value}")
# Make the PUT request
response = self.session.put(url, data=content, headers=headers)
print(f"🔍 Debug: Response status: {response.status_code}")
print(f"🔍 Debug: Response text: {response.text}")
response.raise_for_status()
self.uploaded_hashes[content_hash] = "uploaded"
print(f"✅ Uploaded content: {content_hash[:16]}... ({len(content)} bytes)")
return content_hash
except Exception as e:
print(f"❌ Failed to upload content {content_hash[:16]}...: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" Response: {e.response.text}")
return None
def upload_system_file(self, content: bytes, system_filename: str, content_type: str = "application/octet-stream") -> Optional[str]:
"""Upload system files like roothash, root.docSchema with fixed filenames"""
print(f"📁 Uploading system file: {system_filename}")
return self.upload_raw_content(content, system_filename=system_filename, content_type=content_type)
def upload_document_file(self, content: bytes, filename: str, content_type: str = "application/octet-stream") -> Optional[str]:
"""Upload document files with UUID.extension pattern"""
print(f"📄 Uploading document file: {filename}")
return self.upload_raw_content(content, filename=filename, content_type=content_type)
def create_metadata_json(self, name: str, parent_uuid: str = "", document_type: str = "DocumentType") -> Tuple[bytes, str]:
"""Create metadata JSON for a document"""
timestamp = self._generate_timestamp()
metadata = {
"createdTime": timestamp,
"lastModified": timestamp,
"lastOpened": "0", # Real app sets this to "0" for never-opened documents
"lastOpenedPage": 0,
"new": False,
"parent": parent_uuid,
"pinned": False,
"source": "com.remarkable.macos", # ✅ FIXED: Match real app behavior
"type": document_type,
"visibleName": name
}
content = json.dumps(metadata, indent=4).encode('utf-8')
content_hash = self._compute_hash(content)
return content, content_hash
def create_pdf_content_json(self, pdf_content: bytes, document_name: str = "") -> Tuple[bytes, str]:
"""Create content JSON for a PDF document based on real app patterns"""
# Basic PDF content structure based on real app analysis
content_data = {
"coverPageNumber": 0,
"customZoomCenterX": 0,
"customZoomCenterY": 936,
"customZoomOrientation": "portrait",
"customZoomPageHeight": 1872,
"customZoomPageWidth": 1404,
"customZoomScale": 1,
"documentMetadata": {
"title": document_name if document_name else "Untitled"
},
"extraMetadata": {},
"fileType": "pdf",
"fontName": "",
"formatVersion": 1,
"lineHeight": -1,
"orientation": "portrait",
"originalPageCount": 1, # Will be updated based on actual PDF
"pageCount": 1, # Will be updated based on actual PDF
"pageTags": [],
"pages": ["6a22f0dc-5606-4d40-946f-ccbc14f777ff"], # Default page UUID
"redirectionPageMap": [0],
"sizeInBytes": len(pdf_content),
"tags": [],
"textAlignment": "justify",
"textScale": 1,
"zoomMode": "bestFit"
}
content = json.dumps(content_data, indent=4).encode('utf-8')
content_hash = self._compute_hash(content)
return content, content_hash
def create_content_json(self, pages: List[str], template: str = "Blank") -> Tuple[bytes, str]:
"""Create content JSON for a notebook with pages"""
timestamp_base = f"2:{len(pages)}"
# Create pages structure
pages_list = []
for i, page_id in enumerate(pages):
pages_list.append({
"id": page_id,
"idx": {
"timestamp": f"2:{i+2}",
"value": chr(ord('a') + i) if i < 26 else f"page_{i}"
},
"template": {
"timestamp": "2:1",
"value": template
}
})
content_data = {
"cPages": {
"lastOpened": {
"timestamp": "2:1",
"value": pages[0] if pages else ""
},
"original": {
"timestamp": "0:0",
"value": -1
},
"pages": pages_list
},
"extraMetadata": {},
"fileType": "notebook",
"fontName": "",
"lineHeight": -1,
"margins": 180,
"pageCount": len(pages),
"textScale": 1,
"transform": {}
}
content = json.dumps(content_data, indent=4).encode('utf-8')
content_hash = self._compute_hash(content)
return content, content_hash
def create_doc_schema(self, document_uuid: str, metadata_hash: str, pagedata_hash: str,
pdf_hash: str, content_hash: str, metadata_size: int, pagedata_size: int,
pdf_size: int, content_size: int) -> Tuple[bytes, str]:
"""Create document schema content in the exact format expected by reMarkable"""
# Based on raw logs: 4 components for a document
lines = ['4']
# Add components in specific order (content, metadata, pagedata, pdf)
lines.append(f"{content_hash}:0:{document_uuid}.content:0:{content_size}")
lines.append(f"{metadata_hash}:0:{document_uuid}.metadata:0:{metadata_size}")
lines.append(f"{pagedata_hash}:0:{document_uuid}.pagedata:0:{pagedata_size}")
lines.append(f"{pdf_hash}:0:{document_uuid}.pdf:0:{pdf_size}")
content = '\n'.join(lines).encode('utf-8')
content_hash = self._compute_hash(content)
return content, content_hash
def create_directory_listing(self, child_objects: List[Dict], data_components: List[Dict]) -> Tuple[bytes, str]:
"""Create directory listing content"""
lines = [str(len(child_objects) + len(data_components))]
# Add child objects (folders/documents)
for obj in child_objects:
line = f"{obj['hash']}:80000000:{obj['uuid']}:{obj['type']}:{obj['size']}"
lines.append(line)
# Add data components (.content, .metadata, .rm files, etc.)
for comp in data_components:
line = f"{comp['hash']}:0:{comp['component']}:0:{comp['size']}"
lines.append(line)
content = '\n'.join(lines).encode('utf-8')
content_hash = self._compute_hash(content)
return content, content_hash
def update_root_directory(self) -> bool:
"""Update the root directory listing by adding the new document to existing entries"""
try:
print("📁 Updating root directory listing...")
# Get the current root.docSchema from the server to preserve existing entries
current_root_entries = self._get_current_root_entries()
if current_root_entries is None:
print("❌ Failed to get current root entries")
return False
# Add the new document entry if it doesn't already exist
new_doc_added = self._add_new_document_to_root_entries(current_root_entries)
if not new_doc_added:
print("📄 No new document to add to root directory")
return True
# Create the updated root directory listing
root_dir_content = self._create_root_directory_from_entries(current_root_entries)
root_dir_hash = self._compute_hash(root_dir_content)
print(f"📂 Updated root directory hash: {root_dir_hash}")
# Upload the updated root directory listing
uploaded_hash = self.upload_system_file(root_dir_content, "root.docSchema")
if not uploaded_hash:
return False
# Update the root hash in the cloud
return self.update_root_hash(root_dir_hash)
except Exception as e:
print(f"❌ Failed to update root directory: {e}")
return False
def _get_current_root_entries(self) -> Optional[List[str]]:
"""Get current root.docSchema entries from server to preserve existing data"""
try:
# Get current root hash
root_url = f"{self.base_url}/sync/v4/root"
root_response = self.session.get(root_url)
root_response.raise_for_status()
current_root = root_response.json()
current_root_hash = current_root.get('hash')
if not current_root_hash:
print("❌ No current root hash found")
return None
# Fetch the current root.docSchema content
root_content_url = f"{self.base_url}/sync/v3/files/{current_root_hash}"
root_content_response = self.session.get(root_content_url)
root_content_response.raise_for_status()
# Parse the content to extract existing entries
content_lines = root_content_response.text.strip().split('\n')
# First line should be version header "3"
if not content_lines or content_lines[0] != '3':
print(f"❌ Unexpected root.docSchema format: {content_lines[0] if content_lines else 'empty'}")
return None
# Return all entries (excluding the version header)
existing_entries = content_lines[1:] if len(content_lines) > 1 else []
print(f"📋 Found {len(existing_entries)} existing root entries")
for entry in existing_entries[:5]: # Show first 5 for debugging
parts = entry.split(':')
if len(parts) >= 3:
uuid = parts[2]
size = parts[-1] if len(parts) > 4 else 'unknown'
print(f" - {uuid}: size={size}")
return existing_entries
except Exception as e:
print(f"❌ Failed to get current root entries: {e}")
return None
def _add_new_document_to_root_entries(self, existing_entries: List[str]) -> bool:
"""Add the current document being uploaded to the root entries list"""
if not self._current_document_uuid:
print("⚠️ No current document UUID to add")
return False
# Check if this document is already in the entries
doc_uuid = self._current_document_uuid
for entry in existing_entries:
if doc_uuid in entry:
print(f"� Document {doc_uuid} already exists in root entries")
return False
# Find the document in our database to get its info
document_node = None
for node_uuid, node in self.database['nodes'].items():
if node_uuid == doc_uuid:
document_node = node
break
if not document_node:
print(f"❌ Document {doc_uuid} not found in database")
return False
# Get the document's hash and size
doc_hash = document_node.get('hash')
if not doc_hash:
print(f"❌ No hash found for document {doc_uuid}")
return False
# Determine the correct node type code based on the document type
# From analysis: Type 1/2 = folders, Type 3 = notebook documents, Type 4 = PDF documents
doc_metadata = document_node.get('metadata', {})
doc_type = doc_metadata.get('type', 'DocumentType')
if doc_type == 'DocumentType' and 'fileType' in doc_metadata.get('content_data', ''):
# Check if it's a PDF or notebook
content_data_str = doc_metadata.get('content_data', '')
if '"fileType": "pdf"' in content_data_str:
node_type_code = 4 # PDF document
elif '"fileType": "notebook"' in content_data_str:
node_type_code = 3 # Notebook document
else:
node_type_code = 4 # Default to PDF for documents
else:
node_type_code = 4 # Default to PDF for documents
# The size in root.docSchema is the actual document content size (PDF size for PDFs)
# Get the actual PDF file size or document content size
doc_size = 0
# First try to get the PDF file size from metadata
doc_metadata = document_node.get('metadata', {})
content_data_str = doc_metadata.get('content_data', '')
if '"sizeInBytes"' in content_data_str:
# Extract sizeInBytes from the content_data JSON string
import re
size_match = re.search(r'"sizeInBytes":\s*"(\d+)"', content_data_str)
if size_match:
doc_size = int(size_match.group(1))
# Fallback: try to get size from the node itself
if doc_size == 0:
doc_size = document_node.get('size', 0)
# If still no size, use a reasonable default for new documents
if doc_size == 0:
doc_size = 50000 # Reasonable default for a new PDF
# Create the new entry in the same format as existing ones
# Format: hash:80000000:uuid:type:actual_document_size
new_entry = f"{doc_hash}:80000000:{doc_uuid}:{node_type_code}:{doc_size}"
existing_entries.append(new_entry)
print(f"✅ Added new document entry: {doc_uuid} (size={doc_size})")
return True
def _create_root_directory_from_entries(self, entries: List[str]) -> bytes:
"""Create root.docSchema content from list of entries"""
# Always start with version header "3"
lines = ["3"] + entries
# Sort entries by UUID for consistency (skip the version header)
if len(lines) > 1:
entry_lines = lines[1:]
# Sort by UUID (3rd field after splitting by ':')
entry_lines.sort(key=lambda x: x.split(':')[2] if ':' in x else x)
lines = ["3"] + entry_lines
# Create content with newline separator
content = '\n'.join(lines) + '\n'
print(f"🔍 Debug: Updated root directory content:")
print(f" Version header: 3")
print(f" Entry count: {len(entries)}")
print(f" Total lines: {len(lines)}")
print(f" Content length: {len(content.encode('utf-8'))} bytes")
print(f" Preview: {content[:100]}...")
return content.encode('utf-8')
def create_root_directory_listing(self, root_entries: List[Dict]) -> bytes:
"""Create root directory listing with version header '3' (matching /sync/v3/ API version)"""
# Always use "3" as version header (not count) - this matches the /sync/v3/ API version
lines = ["3"]
# Add each entry in the format: hash:80000000:uuid:node_type:size
# Sort by UUID for consistent ordering (like document components)
sorted_entries = sorted(root_entries, key=lambda x: x['uuid'])
for entry in sorted_entries:
line = f"{entry['hash']}:80000000:{entry['uuid']}:{entry['node_type']}:{entry['size']}"
lines.append(line)
# Use the same approach as document uploads - with newline
content = '\n'.join(lines) + '\n'
print(f"🔍 Debug: Root directory content:")
print(f" Version header: 3 (API version, not count)")
print(f" Entry count: {len(root_entries)}")
print(f" Total lines: {len(lines)}")
print(f" Content length: {len(content.encode('utf-8'))} bytes")
print(f" Preview: {content[:100]}...")
return content.encode('utf-8')
def update_root_hash(self, new_root_hash: str) -> bool:
"""Update the root hash in the cloud - send as text body with proper headers like other files"""
try:
# Use the server generation captured at the start of upload sequence
if self._server_generation is None:
print(f"⚠️ Warning: No server generation captured, capturing now...")
if not self._capture_server_generation():
print(f"❌ Failed to get server generation, aborting root hash update")
return False
generation = self._server_generation
print(f"🔍 Using server generation: {generation}")
print(f"🔍 New root hash: {new_root_hash}")
# Create the root data exactly like the real app
root_data = {
"broadcast": True,
"generation": generation,
"hash": new_root_hash
}
# Convert to JSON text with same formatting as real app (pretty-printed with 2-space indent)
root_content = json.dumps(root_data, indent=2).encode('utf-8')
# Set up headers exactly like the real app (case-sensitive and ordered correctly)
headers = {
'Content-Type': 'application/json',
'rm-batch-number': '1',
'rm-filename': 'roothash',
'rm-sync-id': str(uuid.uuid4()),
'User-Agent': 'desktop/3.20.0.922 (macos 15.4)', # ✅ FIXED: Match real app
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-BE,*',
'Connection': 'Keep-Alive',
}
# Add CRC32C checksum
crc32c_header = self._compute_crc32c_header(root_content)
if crc32c_header:
headers['x-goog-hash'] = crc32c_header
print(f"🔍 Debug: Root hash update headers:")
for key, value in headers.items():
print(f" {key}: {value}")
print(f"🔍 Debug: Root hash content: {root_content.decode('utf-8')}")
url = f"{self.base_url}/sync/v3/root"
response = self.session.put(url, data=root_content, headers=headers)
print(f"🔍 Debug: Root hash response status: {response.status_code}")
print(f"🔍 Debug: Root hash response text: {response.text}")
response.raise_for_status()
print(f"✅ Updated root hash: {new_root_hash}")
return True
except Exception as e:
print(f"❌ Failed to update root hash: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" Response: {e.response.text}")
return False
def edit_document_metadata(self, document_uuid: str, new_name: str = None, new_parent: str = None) -> bool:
"""Edit an existing document's metadata"""
try:
# Find the document in database
if document_uuid not in self.database['nodes']:
raise ValueError(f"Document {document_uuid} not found in database")
node = self.database['nodes'][document_uuid]
print(f"📝 Editing document: {node['name']}")
# Get current metadata
current_metadata = node['metadata'].copy()
# Update metadata
if new_name:
current_metadata['visibleName'] = new_name
if new_parent is not None:
current_metadata['parent'] = new_parent
current_metadata['lastModified'] = self._generate_timestamp()
# Create new metadata content
metadata_content = json.dumps(current_metadata, indent=4).encode('utf-8')
metadata_hash = self._compute_hash(metadata_content)
# Upload metadata
self.upload_raw_content(metadata_content, metadata_hash)
# Update component hashes
old_metadata_hash = node['component_hashes']['metadata']
node['component_hashes']['metadata'] = metadata_hash
# Get parent node to update its directory listing
parent_uuid = current_metadata.get('parent', '')
if parent_uuid and parent_uuid in self.database['nodes']:
parent_node = self.database['nodes'][parent_uuid]
# Rebuild parent's directory listing
child_objects = []
data_components = []
# Find all children of this parent
for uuid, child_node in self.database['nodes'].items():
if child_node.get('parent_uuid') == parent_uuid:
if child_node['node_type'] == 'folder':
type_val = '1'
else:
type_val = '3'
child_objects.append({
'hash': child_node['hash'],
'uuid': uuid,
'type': type_val,
'size': len(str(child_node).encode('utf-8')) # Approximate
})
# Add metadata components for this updated document
comp_hashes = node['component_hashes']
for comp_type, comp_hash in comp_hashes.items():
if comp_hash:
if comp_type == 'rm_files':
for i, rm_hash in enumerate(comp_hash):
data_components.append({
'hash': rm_hash,
'component': f"{document_uuid}/{uuid.uuid4()}.rm",
'size': 14661 # Typical RM file size
})
else:
data_components.append({
'hash': comp_hash,
'component': f"{document_uuid}.{comp_type}",
'size': len(metadata_content) if comp_type == 'metadata' else 2209
})
# Create and upload new directory listing
dir_content, dir_hash = self.create_directory_listing(child_objects, data_components)
self.upload_raw_content(dir_content, dir_hash)
# Update parent node hash
parent_node['hash'] = dir_hash
self.database['hash_registry'][dir_hash] = {
'uuid': parent_uuid,
'type': 'node',
'last_seen': datetime.now().isoformat()
}
# Always update root directory after any upload to trigger sync
# This ensures the generation increments for both root and folder uploads
print("🔄 Updating root directory to trigger server generation increment...")
self.update_root_directory()
# Update database
node['metadata'] = current_metadata
node['last_modified'] = current_metadata['lastModified']
node['sync_status'] = 'updated'
node['last_synced'] = datetime.now().isoformat()
# Update hash registry
self.database['hash_registry'][metadata_hash] = {
'uuid': document_uuid,
'type': 'metadata',
'last_seen': datetime.now().isoformat()
}
self._save_database()
print(f"✅ Successfully updated document metadata")
return True
except Exception as e:
print(f"❌ Failed to edit document metadata: {e}")
return False
def upload_pdf_document(self, pdf_path: str, name: str, parent_uuid: str = "") -> bool:
"""Upload a new PDF document to reMarkable following the correct sequence from app logs"""
try:
# Clear any previous document context
self._clear_document_context()
# FIRST: Capture server generation (like real app does with /sync/v4/root call)
if not self._capture_server_generation():
print(f"❌ Failed to capture server generation, aborting upload")
return False
pdf_file = Path(pdf_path)
if not pdf_file.exists():
raise FileNotFoundError(f"PDF file not found: {pdf_path}")
print(f"📄 Uploading PDF: {name}")
# Generate UUID for new document and set it for consistent rm-filename headers
document_uuid = str(uuid.uuid4())
self._current_document_uuid = document_uuid
print(f"📊 Document UUID: {document_uuid}")
# Read PDF content
with open(pdf_file, 'rb') as f:
pdf_content = f.read()
# EXACT SEQUENCE FROM APP LOGS:
# 1. cf2a3833-4a8f-4004-ab8d-8dc3c5f561bc.metadata
# 2. cf2a3833-4a8f-4004-ab8d-8dc3c5f561bc.pagedata
# 3. cf2a3833-4a8f-4004-ab8d-8dc3c5f561bc.pdf
# 4. cf2a3833-4a8f-4004-ab8d-8dc3c5f561bc.content
# 5. cf2a3833-4a8f-4004-ab8d-8dc3c5f561bc.docSchema
# 6. root.docSchema
# 7. roothash
print("📝 Step 1: Creating and uploading metadata...")
# Create metadata FIRST (as per app logs)
metadata_content, metadata_hash = self.create_metadata_json(name, parent_uuid)
metadata_upload_hash = self.upload_raw_content(
content=metadata_content,
content_type='application/octet-stream',
filename=f"{document_uuid}.metadata"
)
if not metadata_upload_hash:
raise Exception("Failed to upload metadata")
print("📝 Step 2: Creating and uploading pagedata...")
# For PDFs, create minimal pagedata (single newline like real app)
pagedata_content = b'\n' # ✅ FIXED: Real app uses newline, not empty string
pagedata_upload_hash = self.upload_raw_content(
content=pagedata_content,
content_type='application/octet-stream',
filename=f"{document_uuid}.pagedata"
)
if not pagedata_upload_hash:
raise Exception("Failed to upload pagedata")
print("📝 Step 3: Uploading PDF content...")
pdf_upload_hash = self.upload_raw_content(
content=pdf_content,
content_type='application/pdf',
filename=f"{document_uuid}.pdf"
)
if not pdf_upload_hash:
raise Exception("Failed to upload PDF content")
print("📝 Step 4: Creating and uploading content...")
# Create proper PDF content structure based on real app patterns
content_data, content_hash = self.create_pdf_content_json(pdf_content, name)
content_upload_hash = self.upload_raw_content(
content=content_data,
content_type='application/octet-stream',
filename=f"{document_uuid}.content"
)
if not content_upload_hash:
raise Exception("Failed to upload content")
print("📝 Step 5: Creating and uploading document schema...")
# Create document schema in exact format from raw logs (4 components but count is 3)
doc_schema_entries = [
f"{content_hash}:0:{document_uuid}.content:0:{len(content_data)}",
f"{metadata_hash}:0:{document_uuid}.metadata:0:{len(metadata_content)}",
f"{pagedata_upload_hash}:0:{document_uuid}.pagedata:0:{len(pagedata_content)}",
f"{pdf_upload_hash}:0:{document_uuid}.pdf:0:{len(pdf_content)}"
]
# Note: count is 3 even though there are 4 entries (PDF doesn't count)
doc_schema_content = f"3\n" + "\n".join(doc_schema_entries)
doc_schema_bytes = doc_schema_content.encode('utf-8')
doc_schema_hash = self._compute_hash(doc_schema_bytes)
doc_schema_upload_hash = self.upload_raw_content(
content=doc_schema_bytes,
content_type='text/plain; charset=UTF-8',
filename=f"{document_uuid}.docSchema"
)
if not doc_schema_upload_hash:
raise Exception("Failed to upload document schema")
# Create document directory listing
data_components = [
{
'hash': metadata_hash,
'component': f"{document_uuid}.metadata",
'size': len(metadata_content)
},
{
'hash': pagedata_upload_hash,
'component': f"{document_uuid}.pagedata",
'size': len(pagedata_content)
},
{
'hash': pdf_upload_hash,
'component': f"{document_uuid}.pdf",
'size': len(pdf_content)
},
{
'hash': content_hash,
'component': f"{document_uuid}.content",
'size': len(content_data)
}
]
# Add to database
new_node = {
'uuid': document_uuid,
'hash': doc_schema_hash, # Document hash is the docSchema hash
'name': name,
'node_type': 'document',
'parent_uuid': parent_uuid,
'local_path': f"content/{name}",
'extracted_files': [str(pdf_file)],
'component_hashes': {
'content': content_hash,
'metadata': metadata_hash,
'pdf': pdf_upload_hash,
'pagedata': pagedata_upload_hash,
'docSchema': doc_schema_hash,
'rm_files': []
},
'metadata': json.loads(metadata_content.decode('utf-8')),
'last_modified': self._generate_timestamp(),
'version': 1,
'sync_status': 'uploaded',
'last_synced': datetime.now().isoformat(),
'size': len(pdf_content) # Store the actual PDF file size
}
# Update the metadata to include content_data with sizeInBytes for proper root.docSchema sizing
content_data = {
"fileType": "pdf",
"sizeInBytes": str(len(pdf_content)),
"pageCount": 1,
"formatVersion": 1,
"orientation": "portrait"
}
new_node['metadata']['content_data'] = json.dumps(content_data)
# 🚫 REMOVED: Direct database manipulation for final state
# Do NOT add to database permanently - let replica sync handle final state
# 🚫 REMOVED: Hash registry updates
# Let replica sync discover and register all hashes properly
# CRITICAL: Complete the proper upload sequence from real app logs
print("📝 Step 6: Updating root.docSchema with new document...")
# Temporarily add document to database for root.docSchema update
temp_node = {
'uuid': document_uuid,
'hash': doc_schema_hash, # Document hash is the docSchema hash
'name': name,
'node_type': 'document',
'parent_uuid': parent_uuid,
'metadata': json.loads(metadata_content.decode('utf-8')),
'component_hashes': {
'docSchema': doc_schema_hash
},
'size': len(doc_schema_bytes) # Use docSchema size for root.docSchema
}
# Add temporarily for root update
self.database['nodes'][document_uuid] = temp_node
root_update_success = self.update_root_directory()
if not root_update_success:
print("⚠️ Warning: Root directory update failed - document may not appear in real app")
# Remove temporary entry if root update failed
del self.database['nodes'][document_uuid]
# Don't fail the upload completely, but warn user
else:
print("✅ Root directory updated successfully")
# Remove temporary entry - let replica sync handle final database state
del self.database['nodes'][document_uuid]
# Always trigger replica sync after any upload (root or folder)
# This ensures the new document is properly downloaded and cataloged with final state
print("📝 Step 7: Running final replica sync to verify upload...")
try:
from local_replica_v2 import RemarkableReplicaBuilder
replica_builder = RemarkableReplicaBuilder(self.session)
print("🔄 Running replica sync to discover new document...")
replica_builder.build_complete_replica()
# Reload our database to get the freshly synced data
print("🔄 Reloading database with fresh sync data...")
self.database = self._load_database()
# Verify the document was properly synced
if document_uuid in self.database['nodes']:
synced_node = self.database['nodes'][document_uuid]
print(f"✅ Document synced successfully: {synced_node['name']}")
print(f" UUID: {document_uuid}")
print(f" Hash: {synced_node['hash']}")
print(f" Local path: {synced_node.get('local_path', 'Not set')}")
print(f" Extracted files: {synced_node.get('extracted_files', [])}")
else:
print(f"⚠️ Document {document_uuid} not found in synced database - may need more time to propagate")
except Exception as sync_e:
print(f"⚠️ Replica sync failed, but upload may have succeeded: {sync_e}")
# Don't fail the entire upload if sync fails
pass
# 🚫 REMOVED: Final database save - let replica sync handle database updates
# self._save_database()
print(f"✅ Successfully uploaded PDF document: {name}")
print(f"🔄 Document should appear in your device shortly after sync")
return True
except Exception as e:
print(f"❌ Failed to upload PDF document: {e}")
return False
def create_notebook(self, name: str, parent_uuid: str = "", template: str = "Blank") -> bool:
"""Create a new empty notebook"""
try:
# Clear any previous document context
self._clear_document_context()
print(f"📓 Creating notebook: {name}")
# Generate UUIDs and set current document UUID for consistent rm-filename headers
document_uuid = str(uuid.uuid4())
self._current_document_uuid = document_uuid
page_uuid = str(uuid.uuid4())
print(f"📊 Document UUID: {document_uuid}")
# Create empty .rm content for first page
rm_content = b'\x00' * 1000 # Minimal empty page content
rm_hash = self.upload_raw_content(
content=rm_content,
content_type='application/octet-stream',
filename=f"{page_uuid}.rm"
)
# Create content.json
content_data, content_hash = self.create_content_json([page_uuid], template)
self.upload_raw_content(
content=content_data,
content_type='application/octet-stream',
filename=f"{document_uuid}.content"
)
# Create metadata
metadata_content, metadata_hash = self.create_metadata_json(name, parent_uuid)
self.upload_raw_content(
content=metadata_content,
content_type='application/octet-stream',
filename=f"{document_uuid}.metadata"
)
# Create document directory listing
data_components = [
{
'hash': content_hash,
'component': f"{document_uuid}.content",
'size': len(content_data)
},
{
'hash': metadata_hash,
'component': f"{document_uuid}.metadata",
'size': len(metadata_content)
},
{
'hash': rm_hash,
'component': f"{document_uuid}/{page_uuid}.rm",
'size': len(rm_content)
}
]
doc_dir_content, doc_dir_hash = self.create_directory_listing([], data_components)
self.upload_raw_content(doc_dir_content, doc_dir_hash)
# Add to database
new_node = {
'uuid': document_uuid,
'hash': doc_dir_hash,
'name': name,
'node_type': 'document',
'parent_uuid': parent_uuid,
'local_path': f"content/{name}",
'extracted_files': [],
'component_hashes': {
'content': content_hash,
'metadata': metadata_hash,
'pdf': None,
'pagedata': None,
'rm_files': [rm_hash]
},
'metadata': json.loads(metadata_content.decode('utf-8')),
'last_modified': self._generate_timestamp(),
'version': 1,
'sync_status': 'created',
'last_synced': datetime.now().isoformat()
}
# 🚫 REMOVED: Direct database manipulation
# Do NOT add to database directly - let replica sync handle it properly
# self.database['nodes'][document_uuid] = new_node
# 🚫 REMOVED: Hash registry updates
# Let replica sync discover and register all hashes properly
# Hash registry should only be populated from actual cloud downloads
# 🔄 CRITICAL FIX: Instead of manually updating database, trigger fresh replica sync
# This ensures the new notebook is properly downloaded and cataloged
try:
from local_replica_v2 import RemarkableReplicaBuilder
replica_builder = RemarkableReplicaBuilder(self.session)
print("🔄 Running replica sync to discover new notebook...")
replica_builder.build_complete_replica()
# Reload our database to get the freshly synced data
print("🔄 Reloading database with fresh sync data...")
self.database = self._load_database()
# Verify the notebook was properly synced
if document_uuid in self.database['nodes']:
synced_node = self.database['nodes'][document_uuid]
print(f"✅ Notebook synced successfully: {synced_node['name']}")
print(f" UUID: {document_uuid}")
print(f" Hash: {synced_node['hash']}")
print(f" Local path: {synced_node.get('local_path', 'Not set')}")
print(f" Extracted files: {synced_node.get('extracted_files', [])}")
else:
print(f"⚠️ Notebook {document_uuid} not found in synced database - may need more time to propagate")
except Exception as sync_e:
print(f"⚠️ Replica sync failed, but upload may have succeeded: {sync_e}")
# Don't fail the entire upload if sync fails
pass
print(f"✅ Successfully created notebook: {name}")
print(f"🔄 Notebook should appear in your device shortly after sync")
return True
except Exception as e:
print(f"❌ Failed to create notebook: {e}")
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, session, replica_database_path)
Purpose: Internal method: init
Parameters:
session: Type: requests.Sessionreplica_database_path: Type: str
Returns: None
_clear_document_context(self)
Purpose: Clear the current document UUID context for new uploads
Returns: None
_load_database(self) -> Dict[str, Any]
Purpose: Load the replica database
Returns: Returns Dict[str, Any]
_save_database(self)
Purpose: Save the updated database
Returns: None
_compute_hash(self, content) -> str
Purpose: Compute SHA256 hash of content
Parameters:
content: Type: bytes
Returns: Returns str
_compute_crc32c_header(self, content) -> str
Purpose: Compute CRC32C checksum and return as x-goog-hash header value
Parameters:
content: Type: bytes
Returns: Returns str
_generate_timestamp(self) -> str
Purpose: Generate reMarkable timestamp
Returns: Returns str
_generate_generation(self) -> int
Purpose: Generate reMarkable generation number
Returns: Returns int
_capture_server_generation(self) -> bool
Purpose: Capture the current server generation for use in final root update
Returns: Returns bool
upload_raw_content(self, content, content_hash, filename, content_type, system_filename) -> Optional[str]
Purpose: Upload raw content and return its hash
Parameters:
content: Type: bytescontent_hash: Type: strfilename: Type: strcontent_type: Type: strsystem_filename: Type: str
Returns: Returns Optional[str]
upload_system_file(self, content, system_filename, content_type) -> Optional[str]
Purpose: Upload system files like roothash, root.docSchema with fixed filenames
Parameters:
content: Type: bytessystem_filename: Type: strcontent_type: Type: str
Returns: Returns Optional[str]
upload_document_file(self, content, filename, content_type) -> Optional[str]
Purpose: Upload document files with UUID.extension pattern
Parameters:
content: Type: bytesfilename: Type: strcontent_type: Type: str
Returns: Returns Optional[str]
create_metadata_json(self, name, parent_uuid, document_type) -> Tuple[bytes, str]
Purpose: Create metadata JSON for a document
Parameters:
name: Type: strparent_uuid: Type: strdocument_type: Type: str
Returns: Returns Tuple[bytes, str]
create_pdf_content_json(self, pdf_content, document_name) -> Tuple[bytes, str]
Purpose: Create content JSON for a PDF document based on real app patterns
Parameters:
pdf_content: Type: bytesdocument_name: Type: str
Returns: Returns Tuple[bytes, str]
create_content_json(self, pages, template) -> Tuple[bytes, str]
Purpose: Create content JSON for a notebook with pages
Parameters:
pages: Type: List[str]template: Type: str
Returns: Returns Tuple[bytes, str]
create_doc_schema(self, document_uuid, metadata_hash, pagedata_hash, pdf_hash, content_hash, metadata_size, pagedata_size, pdf_size, content_size) -> Tuple[bytes, str]
Purpose: Create document schema content in the exact format expected by reMarkable
Parameters:
document_uuid: Type: strmetadata_hash: Type: strpagedata_hash: Type: strpdf_hash: Type: strcontent_hash: Type: strmetadata_size: Type: intpagedata_size: Type: intpdf_size: Type: intcontent_size: Type: int
Returns: Returns Tuple[bytes, str]
create_directory_listing(self, child_objects, data_components) -> Tuple[bytes, str]
Purpose: Create directory listing content
Parameters:
child_objects: Type: List[Dict]data_components: Type: List[Dict]
Returns: Returns Tuple[bytes, str]
update_root_directory(self) -> bool
Purpose: Update the root directory listing by adding the new document to existing entries
Returns: Returns bool
_get_current_root_entries(self) -> Optional[List[str]]
Purpose: Get current root.docSchema entries from server to preserve existing data
Returns: Returns Optional[List[str]]
_add_new_document_to_root_entries(self, existing_entries) -> bool
Purpose: Add the current document being uploaded to the root entries list
Parameters:
existing_entries: Type: List[str]
Returns: Returns bool
_create_root_directory_from_entries(self, entries) -> bytes
Purpose: Create root.docSchema content from list of entries
Parameters:
entries: Type: List[str]
Returns: Returns bytes
create_root_directory_listing(self, root_entries) -> bytes
Purpose: Create root directory listing with version header '3' (matching /sync/v3/ API version)
Parameters:
root_entries: Type: List[Dict]
Returns: Returns bytes
update_root_hash(self, new_root_hash) -> bool
Purpose: Update the root hash in the cloud - send as text body with proper headers like other files
Parameters:
new_root_hash: Type: str
Returns: Returns bool
edit_document_metadata(self, document_uuid, new_name, new_parent) -> bool
Purpose: Edit an existing document's metadata
Parameters:
document_uuid: Type: strnew_name: Type: strnew_parent: Type: str
Returns: Returns bool
upload_pdf_document(self, pdf_path, name, parent_uuid) -> bool
Purpose: Upload a new PDF document to reMarkable following the correct sequence from app logs
Parameters:
pdf_path: Type: strname: Type: strparent_uuid: Type: str
Returns: Returns bool
create_notebook(self, name, parent_uuid, template) -> bool
Purpose: Create a new empty notebook
Parameters:
name: Type: strparent_uuid: Type: strtemplate: Type: str
Returns: Returns bool
Required Imports
import os
import json
import hashlib
import requests
import uuid
Usage Example
# Example usage:
# result = RemarkableUploadManager(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class RemarkableUploadManager_v1 98.9% similar
-
class RemarkablePDFUploader_v1 77.5% similar
-
class RemarkableCloudManager 75.4% similar
-
class RemarkablePDFUploader 70.9% similar
-
class RemarkableUploadTests 69.9% similar