class RemarkableReplicaBuilder
Step-by-step replica builder
/tf/active/vicechatdev/e-ink-llm/cloudtest/local_replica_v2.py
52 - 884
moderate
Purpose
Step-by-step replica builder
Source Code
class RemarkableReplicaBuilder:
"""Step-by-step replica builder"""
def __init__(self, session: requests.Session, replica_dir: str = "remarkable_replica_v2"):
self.session = session
self.base_url = "https://eu.tectonic.remarkable.com"
# Setup directories
self.replica_dir = Path(replica_dir).resolve()
self.content_dir = self.replica_dir / "content"
self.raw_dir = self.replica_dir / "raw_components"
for directory in [self.replica_dir, self.content_dir, self.raw_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Setup logging
self.log_file = self.replica_dir / "build.log"
self.setup_logging()
# State
self.nodes: Dict[str, RemarkableNode] = {}
self.all_hashes: Set[str] = set()
self.failed_downloads: Set[str] = set()
self.existing_database: Optional[Dict[str, Any]] = None
# Load existing database if it exists
self._load_existing_database()
# Statistics
self.stats = {
'total_nodes': 0,
'folders': 0,
'documents': 0,
'trash_items': 0,
'pdfs_extracted': 0,
'rm_files_extracted': 0,
'rm_pdfs_converted': 0,
'total_files': 0,
'nodes_updated': 0,
'nodes_added': 0,
'nodes_unchanged': 0
}
def setup_logging(self):
"""Setup logging"""
self.logger = logging.getLogger('ReplicaBuilder')
self.logger.setLevel(logging.DEBUG)
self.logger.handlers.clear()
# File handler
file_handler = logging.FileHandler(self.log_file, mode='w', encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
file_formatter = logging.Formatter(
'%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
file_handler.setFormatter(file_formatter)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('%(message)s')
console_handler.setFormatter(console_formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
self.logger.info(f"🏗️ REMARKABLE REPLICA BUILDER (STEP-BY-STEP)")
self.logger.info(f"📁 Replica directory: {self.replica_dir}")
def _load_existing_database(self):
"""Load existing database if it exists"""
database_file = self.replica_dir / "replica_database.json"
if database_file.exists():
try:
with open(database_file, 'r', encoding='utf-8') as f:
self.existing_database = json.load(f)
existing_count = len(self.existing_database.get('nodes', {}))
last_sync = self.existing_database.get('replica_info', {}).get('last_sync', 'unknown')
self.logger.info(f"📂 Found existing database with {existing_count} nodes")
self.logger.info(f"📅 Last sync: {last_sync}")
except Exception as e:
self.logger.warning(f"⚠️ Failed to load existing database: {e}")
self.existing_database = None
else:
self.logger.info(f"📂 No existing database found - full sync will be performed")
def _should_update_node(self, node_hash: str, node_uuid: str) -> bool:
"""Check if a node needs to be updated based on existing database"""
if not self.existing_database:
return True
existing_nodes = self.existing_database.get('nodes', {})
hash_registry = self.existing_database.get('hash_registry', {})
# Check if this hash is already known
if node_hash in hash_registry:
existing_uuid = hash_registry[node_hash].get('uuid')
if existing_uuid == node_uuid:
# Same node, same hash - no update needed
return False
# Check if node exists but with different hash (updated)
if node_uuid in existing_nodes:
existing_hash = existing_nodes[node_uuid].get('hash')
if existing_hash != node_hash:
# Node exists but hash changed - update needed
return True
else:
# Same hash - no update needed
return False
# New node - update needed
return True
def fetch_hash_content(self, hash_ref: str) -> Optional[Dict[str, Any]]:
"""Fetch content from reMarkable cloud by hash"""
if hash_ref in self.failed_downloads:
return None
try:
url = f"{self.base_url}/sync/v3/files/{hash_ref}"
self.logger.debug(f"FETCHING: {hash_ref[:16]}...")
response = self.session.get(url)
response.raise_for_status()
content = response.content
self.logger.debug(f" → {len(content)} bytes")
return {
'hash': hash_ref,
'content': content,
'size': len(content)
}
except Exception as e:
self.logger.error(f"Failed to fetch {hash_ref[:16]}...: {e}")
self.failed_downloads.add(hash_ref)
return None
def get_root_hash(self) -> Optional[str]:
"""Get the root hash"""
try:
url = f"{self.base_url}/sync/v4/root"
self.logger.debug(f"Getting root hash from: {url}")
response = self.session.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
root_hash = data.get('hash')
self.logger.info(f"🌱 Root hash: {root_hash}")
return root_hash
else:
self.logger.error(f"Root request failed with status {response.status_code}")
return None
except Exception as e:
self.logger.error(f"Failed to get root hash: {e}")
self.logger.debug(f"Response text: {getattr(response, 'text', 'No response')}")
return None
def parse_directory_listing(self, content: bytes) -> Dict[str, Any]:
"""Parse directory listing"""
try:
text_content = content.decode('utf-8')
except UnicodeDecodeError:
return {'child_objects': [], 'data_components': []}
result = {
'child_objects': [],
'data_components': []
}
lines = text_content.split('\n')
if lines and lines[0].strip().isdigit():
lines = lines[1:] # Skip count line
import re
entry_pattern = r'^([a-f0-9]{64}):([0-9a-fA-F]+):([a-f0-9-/]+(?:\.[^:]+)?):(\d+):(\d+)$'
for line in lines:
line = line.strip()
if not line:
continue
match = re.match(entry_pattern, line, re.IGNORECASE)
if match:
hash_val, flags, uuid_component, type_val, size_val = match.groups()
entry_info = {
'hash': hash_val,
'flags': flags,
'uuid_component': uuid_component,
'type': type_val,
'size': int(size_val)
}
if '.' in uuid_component:
# Data component (.content, .metadata, .pdf, .rm, etc.)
component_type = uuid_component.split('.')[-1]
if '/' in component_type: # Handle .rm files like "uuid/filename.rm"
component_type = component_type.split('/')[-1]
entry_info['component_type'] = component_type
result['data_components'].append(entry_info)
else:
# Child object (pure UUID)
result['child_objects'].append(entry_info)
return result
def extract_metadata(self, metadata_hash: str) -> Optional[Dict[str, Any]]:
"""Extract metadata from hash"""
content_info = self.fetch_hash_content(metadata_hash)
if not content_info:
return None
try:
text_content = content_info['content'].decode('utf-8')
return json.loads(text_content)
except (UnicodeDecodeError, json.JSONDecodeError) as e:
self.logger.debug(f"Failed to parse metadata {metadata_hash[:16]}...: {e}")
return None
# ================================================================
# STEP 1: DISCOVERY PHASE
# ================================================================
def discover_all_nodes(self, root_hash: str) -> bool:
"""Step 1: Discover all nodes and collect metadata"""
self.logger.info(f"📋 STEP 1: DISCOVERY PHASE")
self.logger.info(f"🔍 Discovering all nodes from root...")
discovered_hashes = set()
hashes_to_process = [root_hash]
while hashes_to_process:
current_hash = hashes_to_process.pop(0)
if current_hash in discovered_hashes:
continue
discovered_hashes.add(current_hash)
self.logger.debug(f" Processing: {current_hash[:16]}...")
# Fetch and parse content
content_info = self.fetch_hash_content(current_hash)
if not content_info:
continue
parsed = self.parse_directory_listing(content_info['content'])
# Extract metadata if available
metadata = {}
metadata_hash = None
node_name = f"unknown_{current_hash[:8]}"
node_type = "folder"
parent_uuid = None
for component in parsed['data_components']:
if component['component_type'] == 'metadata':
metadata_hash = component['hash']
extracted_metadata = self.extract_metadata(metadata_hash)
if extracted_metadata:
metadata = extracted_metadata
node_name = metadata.get('visibleName', node_name)
if metadata.get('type') == 'DocumentType':
node_type = "document"
elif metadata.get('type') == 'CollectionType':
node_type = "folder"
parent_uuid = metadata.get('parent', '') or None
break
# Determine node UUID
node_uuid = None
for component in parsed['child_objects']:
node_uuid = component['uuid_component']
break
if not node_uuid and parsed['data_components']:
component_name = parsed['data_components'][0]['uuid_component']
if '.' in component_name:
node_uuid = component_name.split('.')[0]
if not node_uuid:
node_uuid = current_hash[:32] # Fallback
# Check if node needs updating (incremental sync)
if self._should_update_node(current_hash, node_uuid):
# Create node
node = RemarkableNode(
uuid=node_uuid,
hash=current_hash,
name=node_name,
node_type=node_type,
parent_uuid=parent_uuid,
metadata=metadata
)
# Extract component hashes
for component in parsed['data_components']:
comp_type = component['component_type']
comp_hash = component['hash']
if comp_type == 'content':
node.content_hash = comp_hash
elif comp_type == 'metadata':
node.metadata_hash = comp_hash
elif comp_type == 'pdf':
node.pdf_hash = comp_hash
elif comp_type == 'pagedata':
node.pagedata_hash = comp_hash
elif comp_type == 'rm' or comp_type.endswith('.rm'):
node.rm_hashes.append(comp_hash)
# Store node
self.nodes[node_uuid] = node
self.stats['nodes_added'] += 1
self.logger.debug(f" → NEW/UPDATED {node_type}: {node_name} (parent: {parent_uuid or 'ROOT'})")
else:
# Node unchanged - load from existing database
if self.existing_database and node_uuid in self.existing_database.get('nodes', {}):
existing_node_data = self.existing_database['nodes'][node_uuid]
node = RemarkableNode(
uuid=existing_node_data['uuid'],
hash=existing_node_data['hash'],
name=existing_node_data['name'],
node_type=existing_node_data['node_type'],
parent_uuid=existing_node_data['parent_uuid'],
metadata=existing_node_data['metadata']
)
# Restore component hashes
comp_hashes = existing_node_data.get('component_hashes', {})
node.content_hash = comp_hashes.get('content')
node.metadata_hash = comp_hashes.get('metadata')
node.pdf_hash = comp_hashes.get('pdf')
node.pagedata_hash = comp_hashes.get('pagedata')
node.rm_hashes = comp_hashes.get('rm_files', [])
# Restore paths and files
node.local_path = existing_node_data.get('local_path', '')
node.extracted_files = existing_node_data.get('extracted_files', [])
self.nodes[node_uuid] = node
self.stats['nodes_unchanged'] += 1
self.logger.debug(f" → UNCHANGED {node_type}: {node_name}")
self.stats['total_nodes'] += 1
if node_type == "folder":
self.stats['folders'] += 1
else:
self.stats['documents'] += 1
# Track trash items
if parent_uuid == 'trash':
self.stats['trash_items'] += 1
# Add child hashes to process
for child_obj in parsed['child_objects']:
if child_obj['hash'] not in discovered_hashes:
hashes_to_process.append(child_obj['hash'])
self.logger.info(f"✅ Discovery complete: {len(self.nodes)} nodes found")
self.logger.info(f" 📂 Folders: {self.stats['folders']}")
self.logger.info(f" 📄 Documents: {self.stats['documents']}")
self.logger.info(f" 🗑️ Trash items: {self.stats['trash_items']}")
self.logger.info(f" 🆕 New/Updated: {self.stats['nodes_added']}")
self.logger.info(f" ✅ Unchanged: {self.stats['nodes_unchanged']}")
return True
# ================================================================
# STEP 2: HIERARCHY PHASE
# ================================================================
def build_folder_structure(self) -> bool:
"""Step 2: Build correct folder structure based on parent UUIDs"""
self.logger.info(f"\n📁 STEP 2: HIERARCHY PHASE")
self.logger.info(f"🏗️ Building folder structure...")
# Create special trash folder
trash_folder = self.content_dir / "trash"
trash_folder.mkdir(parents=True, exist_ok=True)
self.logger.info(f"🗑️ Created trash folder: {trash_folder}")
# Find root nodes (nodes with no parent or empty parent)
root_nodes = []
trash_nodes = []
for uuid, node in self.nodes.items():
if node.parent_uuid == 'trash':
trash_nodes.append(node)
elif not node.parent_uuid:
root_nodes.append(node)
self.logger.info(f"📍 Found {len(root_nodes)} root nodes")
self.logger.info(f"🗑️ Found {len(trash_nodes)} trash nodes")
# Build paths recursively from root
for root_node in root_nodes:
self._build_node_paths(root_node, str(self.content_dir))
# Build paths for trash nodes
for trash_node in trash_nodes:
self._build_node_paths(trash_node, str(trash_folder))
# Create all folder directories
for uuid, node in self.nodes.items():
if node.node_type == "folder" and node.local_path:
Path(node.local_path).mkdir(parents=True, exist_ok=True)
self.logger.debug(f"📁 Created: {node.local_path}")
self.logger.info(f"✅ Folder structure built")
return True
def _build_node_paths(self, node: RemarkableNode, parent_path: str):
"""Recursively build paths for node and its children"""
# Sanitize name for filesystem
safe_name = "".join(c for c in node.name if c.isalnum() or c in (' ', '-', '_', '.')).rstrip()
if not safe_name:
safe_name = f"unnamed_{node.uuid[:8]}"
# Set local path
node.local_path = str(Path(parent_path) / safe_name)
# Log with special indication for trash items
if node.parent_uuid == 'trash':
self.logger.debug(f" 🗑️ Trash Path: {node.name} → {node.local_path}")
else:
self.logger.debug(f" Path: {node.name} → {node.local_path}")
# Process children - both normal UUID children and trash children
for child_uuid, child_node in self.nodes.items():
if child_node.parent_uuid == node.uuid:
self._build_node_paths(child_node, node.local_path)
# ================================================================
# STEP 3: EXTRACTION PHASE
# ================================================================
def extract_all_files(self) -> bool:
"""Step 3: Extract PDFs and .rm files to correct locations"""
self.logger.info(f"\n📎 STEP 3: EXTRACTION PHASE")
self.logger.info(f"⬇️ Extracting files to correct locations...")
nodes_to_process = []
for uuid, node in self.nodes.items():
if node.node_type == "document":
# Only process if node is new/updated (has no extracted files from database)
if not node.extracted_files or len(node.extracted_files) == 0:
nodes_to_process.append(node)
if nodes_to_process:
self.logger.info(f"🔄 Processing {len(nodes_to_process)} new/updated documents...")
for node in nodes_to_process:
self._extract_node_files(node)
else:
self.logger.info(f"✅ No new documents to process - all files up to date")
self.logger.info(f"✅ File extraction complete")
self.logger.info(f" 📄 PDFs extracted: {self.stats['pdfs_extracted']}")
self.logger.info(f" 🖊️ RM files extracted: {self.stats['rm_files_extracted']}")
self.logger.info(f" 📄 RM→PDF conversions: {self.stats['rm_pdfs_converted']}")
return True
def _extract_node_files(self, node: RemarkableNode):
"""Extract files for a document node"""
if not node.local_path:
self.logger.warning(f"No local path for {node.name}")
return
# Ensure parent directory exists
parent_dir = Path(node.local_path).parent
parent_dir.mkdir(parents=True, exist_ok=True)
# Extract PDF if available - this goes directly to the folder structure
if node.pdf_hash:
pdf_path = Path(node.local_path).with_suffix('.pdf')
if self._extract_pdf(node.pdf_hash, pdf_path):
node.extracted_files.append(str(pdf_path))
self.stats['pdfs_extracted'] += 1
self.logger.debug(f" 📄 PDF: {pdf_path}")
# Extract .rm files if available - these get converted to PDF
if node.rm_hashes:
# Create temporary notebook subdirectory for processing
notebook_dir = parent_dir / f"{Path(node.local_path).stem}_temp_notebook"
notebook_dir.mkdir(exist_ok=True)
# Extract .rm files to temporary directory
for i, rm_hash in enumerate(node.rm_hashes):
rm_path = notebook_dir / f"page_{i+1}.rm"
if self._extract_rm_file(rm_hash, rm_path):
self.stats['rm_files_extracted'] += 1
self.logger.debug(f" 🖊️ RM: {rm_path}")
# Convert .rm files to PDF (this places the PDF in the correct location)
self._convert_notebook_to_pdf(node, notebook_dir)
# Clean up temporary notebook directory after conversion
import shutil
shutil.rmtree(notebook_dir, ignore_errors=True)
# Store metadata components in node for database (don't extract to filesystem)
if node.content_hash:
content_info = self.fetch_hash_content(node.content_hash)
if content_info:
try:
node.metadata['content_data'] = content_info['content'].decode('utf-8')
except UnicodeDecodeError:
node.metadata['content_data'] = f"<binary data: {len(content_info['content'])} bytes>"
if node.pagedata_hash:
pagedata_info = self.fetch_hash_content(node.pagedata_hash)
if pagedata_info:
try:
node.metadata['pagedata_data'] = pagedata_info['content'].decode('utf-8')
except UnicodeDecodeError:
node.metadata['pagedata_data'] = f"<binary data: {len(pagedata_info['content'])} bytes>"
def _extract_pdf(self, pdf_hash: str, target_path: Path) -> bool:
"""Extract PDF file"""
content_info = self.fetch_hash_content(pdf_hash)
if not content_info:
return False
try:
with open(target_path, 'wb') as f:
f.write(content_info['content'])
return True
except Exception as e:
self.logger.error(f"Failed to write PDF {target_path}: {e}")
return False
def _extract_rm_file(self, rm_hash: str, target_path: Path) -> bool:
"""Extract .rm file"""
content_info = self.fetch_hash_content(rm_hash)
if not content_info:
return False
try:
with open(target_path, 'wb') as f:
f.write(content_info['content'])
return True
except Exception as e:
self.logger.error(f"Failed to write RM file {target_path}: {e}")
return False
def _extract_component(self, comp_hash: str, target_path: Path) -> bool:
"""Extract other component"""
content_info = self.fetch_hash_content(comp_hash)
if not content_info:
return False
try:
with open(target_path, 'wb') as f:
f.write(content_info['content'])
return True
except Exception as e:
self.logger.error(f"Failed to write component {target_path}: {e}")
return False
def _convert_notebook_to_pdf(self, node: RemarkableNode, notebook_dir: Path):
"""Convert reMarkable notebook files to PDF using rmc and concatenate pages"""
try:
import subprocess
# Find all .rm files in the notebook directory
rm_files = sorted(notebook_dir.glob("page_*.rm"), key=lambda x: int(x.stem.split('_')[1]))
if not rm_files:
self.logger.debug(f" ⚠️ No .rm files found for {node.name}")
return
# Final PDF should be placed at the same level as notebook folder, named after the node
parent_dir = notebook_dir.parent
final_pdf_path = parent_dir / f"{node.name}.pdf"
if len(rm_files) == 1:
# Single page - convert directly
try:
result = subprocess.run([
"rmc", str(rm_files[0]), "-o", str(final_pdf_path)
], capture_output=True, text=True, timeout=60)
if result.returncode == 0 and final_pdf_path.exists() and final_pdf_path.stat().st_size > 0:
node.extracted_files.append(str(final_pdf_path))
self.logger.debug(f" 📄 Converted single page to PDF: {final_pdf_path}")
self.stats['rm_pdfs_converted'] += 1
else:
self.logger.debug(f" ⚠️ rmc conversion failed: {result.stderr}")
except (subprocess.TimeoutExpired, Exception) as e:
self.logger.debug(f" ⚠️ rmc conversion error: {e}")
else:
# Multiple pages - convert each to temporary PDF and concatenate
temp_pdfs = []
conversion_success = True
for i, rm_file in enumerate(rm_files):
temp_pdf = notebook_dir / f"temp_page_{i+1}.pdf"
try:
result = subprocess.run([
"rmc", str(rm_file), "-o", str(temp_pdf)
], capture_output=True, text=True, timeout=60)
if result.returncode == 0 and temp_pdf.exists() and temp_pdf.stat().st_size > 0:
temp_pdfs.append(temp_pdf)
self.logger.debug(f" 📄 Converted page {i+1}")
else:
self.logger.debug(f" ⚠️ rmc conversion failed for page {i+1}: {result.stderr}")
conversion_success = False
break
except (subprocess.TimeoutExpired, Exception) as e:
self.logger.debug(f" ⚠️ rmc conversion error for page {i+1}: {e}")
conversion_success = False
break
# Concatenate PDFs if all conversions succeeded
if conversion_success and temp_pdfs:
try:
# Use PyPDF2 to concatenate PDFs
import PyPDF2
pdf_writer = PyPDF2.PdfWriter()
for temp_pdf in temp_pdfs:
with open(temp_pdf, 'rb') as pdf_file:
pdf_reader = PyPDF2.PdfReader(pdf_file)
for page in pdf_reader.pages:
pdf_writer.add_page(page)
# Write the concatenated PDF
with open(final_pdf_path, 'wb') as output_file:
pdf_writer.write(output_file)
if final_pdf_path.exists() and final_pdf_path.stat().st_size > 0:
node.extracted_files.append(str(final_pdf_path))
self.logger.debug(f" 📄 Concatenated {len(temp_pdfs)} pages to PDF: {final_pdf_path}")
self.stats['rm_pdfs_converted'] += 1
except ImportError:
# Fallback: use system tools to concatenate if PyPDF2 not available
self.logger.debug(f" ⚠️ PyPDF2 not available, using first page only")
if temp_pdfs:
import shutil
shutil.copy2(temp_pdfs[0], final_pdf_path)
node.extracted_files.append(str(final_pdf_path))
self.stats['rm_pdfs_converted'] += 1
except Exception as e:
self.logger.debug(f" ⚠️ PDF concatenation failed: {e}")
finally:
# Clean up temporary files
for temp_pdf in temp_pdfs:
temp_pdf.unlink(missing_ok=True)
except Exception as e:
self.logger.debug(f" ⚠️ PDF conversion error for {node.name}: {e}")
# ================================================================
# MAIN BUILD PROCESS
# ================================================================
def build_complete_replica(self) -> bool:
"""Build complete replica using 3-step process"""
self.logger.info(f"🚀 STARTING 3-STEP REPLICA BUILD")
# Get root hash
root_hash = self.get_root_hash()
if not root_hash:
self.logger.error("❌ Failed to get root hash")
return False
# Step 1: Discovery
if not self.discover_all_nodes(root_hash):
self.logger.error("❌ Discovery phase failed")
return False
# Step 2: Hierarchy
if not self.build_folder_structure():
self.logger.error("❌ Hierarchy phase failed")
return False
# Step 3: Extraction
if not self.extract_all_files():
self.logger.error("❌ Extraction phase failed")
return False
# Save database
self._save_database()
# Final report
self.logger.info(f"\n🎉 REPLICA BUILD COMPLETED!")
self.logger.info(f"📊 FINAL STATISTICS:")
self.logger.info(f" 📁 Total nodes: {self.stats['total_nodes']}")
self.logger.info(f" 📂 Folders: {self.stats['folders']}")
self.logger.info(f" 📄 Documents: {self.stats['documents']}")
self.logger.info(f" �️ Trash items: {self.stats['trash_items']}")
self.logger.info(f" �📄 PDFs extracted: {self.stats['pdfs_extracted']}")
self.logger.info(f" 🖊️ RM files extracted: {self.stats['rm_files_extracted']}")
self.logger.info(f" 📄 RM→PDF conversions: {self.stats['rm_pdfs_converted']}")
self.logger.info(f" ❌ Failed downloads: {len(self.failed_downloads)}")
self.logger.info(f"\n📁 Replica location: {self.replica_dir}")
self.logger.info(f"🗑️ Trash location: {self.replica_dir}/content/trash")
return True
def _save_database(self):
"""Save the comprehensive replica database"""
database = {
'replica_info': {
'created': datetime.now().isoformat(),
'last_sync': datetime.now().isoformat(),
'replica_dir': str(self.replica_dir),
'total_nodes': len(self.nodes),
'statistics': self.stats,
'version': "2.0"
},
'nodes': {},
'hash_registry': {}, # For tracking file changes
'failed_downloads': list(self.failed_downloads)
}
# Create detailed node entries
for uuid, node in self.nodes.items():
node_data = {
'uuid': node.uuid,
'hash': node.hash,
'name': node.name,
'node_type': node.node_type,
'parent_uuid': node.parent_uuid,
'local_path': node.local_path,
'extracted_files': node.extracted_files,
# Component hashes for sync tracking
'component_hashes': {
'content': node.content_hash,
'metadata': node.metadata_hash,
'pdf': node.pdf_hash,
'pagedata': node.pagedata_hash,
'rm_files': node.rm_hashes
},
# Full metadata including content and pagedata
'metadata': node.metadata,
# Timestamps
'last_modified': node.metadata.get('lastModified', ''),
'version': node.metadata.get('version', 0),
# Sync status
'sync_status': 'current',
'last_synced': datetime.now().isoformat()
}
database['nodes'][uuid] = node_data
# Add to hash registry for quick lookup
database['hash_registry'][node.hash] = {
'uuid': uuid,
'type': 'node',
'last_seen': datetime.now().isoformat()
}
# Add component hashes to registry
for comp_type, comp_hash in node_data['component_hashes'].items():
if comp_hash:
if isinstance(comp_hash, list):
for i, h in enumerate(comp_hash):
database['hash_registry'][h] = {
'uuid': uuid,
'type': f'{comp_type}_{i}',
'last_seen': datetime.now().isoformat()
}
else:
database['hash_registry'][comp_hash] = {
'uuid': uuid,
'type': comp_type,
'last_seen': datetime.now().isoformat()
}
database_file = self.replica_dir / "replica_database.json"
with open(database_file, 'w', encoding='utf-8') as f:
json.dump(database, f, indent=2, ensure_ascii=False)
self.logger.info(f"💾 Database saved: {database_file}")
# Also create a human-readable summary
summary_file = self.replica_dir / "replica_summary.txt"
with open(summary_file, 'w', encoding='utf-8') as f:
f.write(f"reMarkable Replica Summary\n")
f.write(f"=" * 50 + "\n")
f.write(f"Created: {database['replica_info']['created']}\n")
f.write(f"Location: {database['replica_info']['replica_dir']}\n")
f.write(f"Total Nodes: {database['replica_info']['total_nodes']}\n")
f.write(f"Statistics: {database['replica_info']['statistics']}\n\n")
f.write(f"Folder Structure:\n")
f.write(f"-" * 20 + "\n")
# Write folder structure
def write_node_tree(uuid, indent=0):
if uuid not in self.nodes:
return
node = self.nodes[uuid]
prefix = " " * indent
icon = "📁" if node.node_type == "folder" else "📄"
f.write(f"{prefix}{icon} {node.name}\n")
# Find children
for child_uuid, child_node in self.nodes.items():
if child_node.parent_uuid == uuid:
write_node_tree(child_uuid, indent + 1)
# Write root nodes
for uuid, node in self.nodes.items():
if not node.parent_uuid:
write_node_tree(uuid)
self.logger.info(f"📄 Summary saved: {summary_file}")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, session, replica_dir)
Purpose: Internal method: init
Parameters:
session: Type: requests.Sessionreplica_dir: Type: str
Returns: None
setup_logging(self)
Purpose: Setup logging
Returns: None
_load_existing_database(self)
Purpose: Load existing database if it exists
Returns: None
_should_update_node(self, node_hash, node_uuid) -> bool
Purpose: Check if a node needs to be updated based on existing database
Parameters:
node_hash: Type: strnode_uuid: Type: str
Returns: Returns bool
fetch_hash_content(self, hash_ref) -> Optional[Dict[str, Any]]
Purpose: Fetch content from reMarkable cloud by hash
Parameters:
hash_ref: Type: str
Returns: Returns Optional[Dict[str, Any]]
get_root_hash(self) -> Optional[str]
Purpose: Get the root hash
Returns: Returns Optional[str]
parse_directory_listing(self, content) -> Dict[str, Any]
Purpose: Parse directory listing
Parameters:
content: Type: bytes
Returns: Returns Dict[str, Any]
extract_metadata(self, metadata_hash) -> Optional[Dict[str, Any]]
Purpose: Extract metadata from hash
Parameters:
metadata_hash: Type: str
Returns: Returns Optional[Dict[str, Any]]
discover_all_nodes(self, root_hash) -> bool
Purpose: Step 1: Discover all nodes and collect metadata
Parameters:
root_hash: Type: str
Returns: Returns bool
build_folder_structure(self) -> bool
Purpose: Step 2: Build correct folder structure based on parent UUIDs
Returns: Returns bool
_build_node_paths(self, node, parent_path)
Purpose: Recursively build paths for node and its children
Parameters:
node: Type: RemarkableNodeparent_path: Type: str
Returns: None
extract_all_files(self) -> bool
Purpose: Step 3: Extract PDFs and .rm files to correct locations
Returns: Returns bool
_extract_node_files(self, node)
Purpose: Extract files for a document node
Parameters:
node: Type: RemarkableNode
Returns: None
_extract_pdf(self, pdf_hash, target_path) -> bool
Purpose: Extract PDF file
Parameters:
pdf_hash: Type: strtarget_path: Type: Path
Returns: Returns bool
_extract_rm_file(self, rm_hash, target_path) -> bool
Purpose: Extract .rm file
Parameters:
rm_hash: Type: strtarget_path: Type: Path
Returns: Returns bool
_extract_component(self, comp_hash, target_path) -> bool
Purpose: Extract other component
Parameters:
comp_hash: Type: strtarget_path: Type: Path
Returns: Returns bool
_convert_notebook_to_pdf(self, node, notebook_dir)
Purpose: Convert reMarkable notebook files to PDF using rmc and concatenate pages
Parameters:
node: Type: RemarkableNodenotebook_dir: Type: Path
Returns: None
build_complete_replica(self) -> bool
Purpose: Build complete replica using 3-step process
Returns: Returns bool
_save_database(self)
Purpose: Save the comprehensive replica database
Returns: None
Required Imports
import os
import json
import requests
import logging
from pathlib import Path
Usage Example
# Example usage:
# result = RemarkableReplicaBuilder(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class RemarkableLocalReplica 64.4% similar
-
class RemarkableReplicaSync_v1 63.8% similar
-
class RemarkableReplicaSync 57.9% similar
-
function main_v61 51.8% similar
-
function test_complete_replica_build 50.7% similar