class RemarkableReplicaSync_v1
Standalone replica synchronization using proven local_replica_v2 approach
/tf/active/vicechatdev/e-ink-llm/cloudtest/sync_replica.py
59 - 824
moderate
Purpose
Standalone replica synchronization using proven local_replica_v2 approach
Source Code
class RemarkableReplicaSync:
"""Standalone replica synchronization using proven local_replica_v2 approach"""
def __init__(self, workspace_dir: str = None):
self.workspace_dir = Path(workspace_dir) if workspace_dir else Path(__file__).parent
self.replica_dir = self.workspace_dir / "remarkable_replica_v2"
self.content_dir = self.replica_dir / "content"
# Create directories
for directory in [self.replica_dir, self.content_dir]:
directory.mkdir(parents=True, exist_ok=True)
# Setup logging
self.log_file = self.replica_dir / "build.log"
self.setup_logging()
# Initialize authentication
self.session = self._authenticate()
if not self.session:
raise RuntimeError("Failed to authenticate with reMarkable")
# State matching local_replica_v2.py
self.nodes: Dict[str, RemarkableNode] = {}
self.all_hashes: Set[str] = set()
self.failed_downloads: Set[str] = set()
# Statistics
self.stats = {
'total_nodes': 0,
'folders': 0,
'documents': 0,
'pdfs_extracted': 0,
'rm_files_extracted': 0,
'rm_pdfs_converted': 0,
'nodes_added': 0
}
def setup_logging(self):
"""Setup logging to file"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(self.log_file, mode='w'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def _authenticate(self) -> Optional[requests.Session]:
"""Authenticate with the reMarkable cloud service using token-based approach"""
token_file = self.workspace_dir / '.remarkable_token'
if token_file.exists():
print("ļæ½ Using existing reMarkable token...")
try:
with open(token_file, 'r') as f:
token_data = json.load(f)
session = requests.Session()
session.headers.update({
'Authorization': f'Bearer {token_data["access_token"]}',
'User-Agent': 'remarkable-replica-sync/1.0'
})
# Quick test - try to get document root
test_url = f'{token_data["service_manager_url"]}/document-storage/json/2/docs'
response = session.get(test_url)
if response.status_code == 200:
print("ā
Authentication successful")
return session
else:
print(f"ā Token test failed with status {response.status_code}")
except Exception as e:
print(f"ā Token authentication failed: {e}")
# Need new token
print("š No valid token found. Getting new token...")
return self._get_new_token()
def _get_new_token(self) -> Optional[requests.Session]:
"""Get a new authentication token"""
device_token = '9c4e7c2b-c6c7-4831-8b2a-3f5a2e8f9c3d'
try:
# Step 1: Register device
register_url = 'https://webapp-production-dot-remarkable-production.appspot.com/token/json/2/device/new'
register_data = {
'code': device_token,
'deviceDesc': 'desktop-linux',
'deviceID': hashlib.sha256(f"replica-sync-{int(time.time())}".encode()).hexdigest()[:8]
}
response = requests.post(register_url, json=register_data)
if response.status_code != 200:
print(f"ā Device registration failed: {response.status_code}")
return None
device_bearer = response.text.strip('"')
# Step 2: Get user token
user_url = 'https://webapp-production-dot-remarkable-production.appspot.com/token/json/2/user/new'
user_response = requests.post(
user_url,
headers={'Authorization': f'Bearer {device_bearer}'}
)
if user_response.status_code != 200:
print(f"ā User token failed: {user_response.status_code}")
return None
user_token = user_response.text.strip('"')
# Step 3: Get service discovery
discovery_url = 'https://service-manager-production-dot-remarkable-production.appspot.com/service/json/1/document-storage?environment=production&group=auth0%7C5a68dc51cb30df3877a1d7c4&apiVer=2'
discovery_response = requests.get(
discovery_url,
headers={'Authorization': f'Bearer {user_token}'}
)
if discovery_response.status_code != 200:
print(f"ā Service discovery failed: {discovery_response.status_code}")
return None
service_info = discovery_response.json()
service_url = service_info.get('Host')
if not service_url:
print("ā No service URL in discovery response")
return None
# Save token info
token_data = {
'access_token': user_token,
'service_manager_url': service_url,
'created_at': datetime.now().isoformat()
}
token_file = self.workspace_dir / '.remarkable_token'
with open(token_file, 'w') as f:
json.dump(token_data, f, indent=2)
# Create session
session = requests.Session()
session.headers.update({
'Authorization': f'Bearer {user_token}',
'User-Agent': 'remarkable-replica-sync/1.0'
})
print("ā
New authentication token obtained and saved")
return session
except Exception as e:
print(f"ā Authentication failed: {e}")
return None
return None
def sync_replica(self) -> bool:
"""
Perform replica synchronization using the proven 3-step process:
1. Discovery - Get all nodes from cloud
2. Hierarchy - Build proper folder structure
3. Extraction - Download content to correct locations
"""
try:
self.logger.info("š Starting reMarkable replica sync")
# Phase 1: Discovery
if not self._discover_all_nodes():
self.logger.error("ā Discovery phase failed")
return False
# Phase 2: Build hierarchy
if not self._build_folder_hierarchy():
self.logger.error("ā Hierarchy phase failed")
return False
# Phase 3: Extract content
if not self._extract_content():
self.logger.error("ā Content extraction phase failed")
return False
# Generate summary
self._generate_summary()
self.logger.info("ā
Replica sync completed successfully")
return True
except Exception as e:
self.logger.error(f"ā Sync failed: {e}")
return False
def _load_database(self) -> Dict[str, Any]:
"""Load or create replica database"""
if self.database_path.exists():
try:
with open(self.database_path, 'r') as f:
db = json.load(f)
print(f"š Loaded existing database with {len(db.get('nodes', {}))} nodes")
return db
except Exception as e:
print(f"ā ļø Database corrupted, creating new: {e}")
# Create new database
db = {
'nodes': {},
'hash_registry': {},
'metadata': {
'last_sync': None,
'sync_count': 0,
'created': datetime.now().isoformat()
}
}
print("š Created new replica database")
return db
def _save_database(self):
"""Save database to disk"""
try:
with open(self.database_path, 'w') as f:
json.dump(self.database, f, indent=2, default=str)
# Update metadata
self.database['metadata']['last_sync'] = datetime.now().isoformat()
self.database['metadata']['sync_count'] += 1
except Exception as e:
print(f"ā Failed to save database: {e}")
def _save_file_content(self, content_hash: str, content: bytes, filename: str) -> bool:
"""Save file content to local content directory"""
try:
# Create file path using hash (first 2 chars as subdirectory)
subdir = content_hash[:2]
file_dir = self.content_dir / subdir
file_dir.mkdir(exist_ok=True)
file_path = file_dir / content_hash
# Only save if file doesn't exist (avoid re-downloading)
if not file_path.exists():
with open(file_path, 'wb') as f:
f.write(content)
print(f" š¾ Saved {filename} ({len(content)} bytes)")
# Register in hash registry
if content_hash not in self.database.get('hash_registry', {}):
if 'hash_registry' not in self.database:
self.database['hash_registry'] = {}
self.database['hash_registry'][content_hash] = {
'filename': filename,
'size': len(content),
'type': self._get_file_type(filename),
'downloaded': datetime.now().isoformat()
}
return True
except Exception as e:
print(f"ā Failed to save {filename}: {e}")
return False
def _get_file_type(self, filename: str) -> str:
"""Determine file type from filename"""
if filename.endswith('.pdf'):
return 'pdf'
elif filename.endswith('.metadata'):
return 'metadata'
elif filename.endswith('.content'):
return 'content'
elif filename.endswith('.pagedata'):
return 'pagedata'
elif filename.endswith('.rm'):
return 'notebook_page'
elif filename.endswith('.docSchema'):
return 'docschema'
else:
return 'unknown'
def _compute_hash(self, content: bytes) -> str:
"""Compute SHA256 hash of content"""
return hashlib.sha256(content).hexdigest()
def sync_complete_replica(self) -> bool:
"""Perform complete replica synchronization"""
try:
print("\nš STARTING COMPLETE REPLICA SYNC")
print("=" * 50)
# Step 1: Get current root state
print("š Step 1: Getting root state from server...")
root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
root_response.raise_for_status()
root_data = root_response.json()
current_root_hash = root_data['hash']
current_generation = root_data['generation']
print(f"š± Root hash: {current_root_hash}")
print(f"š¢ Generation: {current_generation}")
# Step 2: Fetch root.docSchema
print("š Step 2: Fetching root.docSchema...")
root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{current_root_hash}")
root_content_response.raise_for_status()
root_content = root_content_response.text
print(f"š Root.docSchema size: {len(root_content)} bytes")
# Step 3: Parse and discover all nodes
print("š Step 3: Discovering all nodes...")
discovered_nodes = {}
lines = root_content.strip().split('\n')
if len(lines) < 1:
print("ā Empty root.docSchema")
return False
version = lines[0]
print(f"š Schema version: {version}")
# Process each entry in root.docSchema
for line_num, line in enumerate(lines[1:], 1):
if ':' in line:
parts = line.split(':')
if len(parts) >= 5:
node_hash = parts[0]
node_uuid = parts[2]
node_type = parts[3]
node_size = parts[4]
# Fetch node details
node_info = self._fetch_node_details(node_uuid, node_hash, node_type, node_size)
if node_info:
discovered_nodes[node_uuid] = node_info
# Progress indicator
if line_num % 5 == 0:
print(f" š Processed {line_num}/{len(lines)-1} entries...")
print(f"ā
Discovered {len(discovered_nodes)} nodes")
# Step 4: Update database
print("š Step 4: Updating database...")
# Count changes
new_nodes = 0
updated_nodes = 0
for uuid, node_info in discovered_nodes.items():
if uuid not in self.database['nodes']:
new_nodes += 1
else:
# Check if updated
existing_node = self.database['nodes'][uuid]
if existing_node.get('hash') != node_info.get('hash'):
updated_nodes += 1
self.database['nodes'][uuid] = node_info
# Remove nodes no longer in cloud
cloud_uuids = set(discovered_nodes.keys())
local_uuids = set(self.database['nodes'].keys())
removed_uuids = local_uuids - cloud_uuids
for uuid in removed_uuids:
del self.database['nodes'][uuid]
print(f"š Database changes:")
print(f" š New nodes: {new_nodes}")
print(f" š Updated nodes: {updated_nodes}")
print(f" šļø Removed nodes: {len(removed_uuids)}")
# Step 5: Save database and summary
print("š Step 5: Saving database and summary...")
self._save_database()
self._save_summary()
# Step 6: Create content index
print("š Step 6: Creating content index...")
self._create_content_index()
# Step 7: Create folder structure with files
print("š Step 7: Creating folder structure...")
self._create_folder_structure()
print(f"\nš REPLICA SYNC COMPLETED!")
print(f"š Total nodes: {len(self.database['nodes'])}")
print(f"š Database: {self.database_path}")
print(f"š Summary: {self.summary_path}")
print(f"š¾ Content files: {self.content_dir}")
print(f"š Folder structure: {self.replica_dir / 'documents'}")
return True
except Exception as e:
print(f"ā Replica sync failed: {e}")
return False
def _fetch_node_details(self, node_uuid: str, node_hash: str, node_type: str, node_size: str) -> Optional[Dict[str, Any]]:
"""Fetch detailed information about a node"""
try:
# Fetch node content (docSchema or metadata)
node_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{node_hash}")
node_response.raise_for_status()
node_content = node_response.text
node_content_bytes = node_response.content
# Save the node content locally
if node_type in ['1', '2']: # Folder
self._save_file_content(node_hash, node_content_bytes, f"{node_uuid}.metadata")
else: # Document
self._save_file_content(node_hash, node_content_bytes, f"{node_uuid}.docSchema")
# Determine node type and parse
if node_type in ['1', '2']: # Folder
return self._parse_folder_node(node_uuid, node_hash, node_content)
elif node_type in ['3', '4']: # Document
return self._parse_document_node(node_uuid, node_hash, node_content, node_type)
else:
print(f"ā ļø Unknown node type {node_type} for {node_uuid[:8]}...")
return None
except Exception as e:
print(f"ā Failed to fetch node {node_uuid[:8]}...: {e}")
return None
def _parse_folder_node(self, node_uuid: str, node_hash: str, folder_content: str) -> Dict[str, Any]:
"""Parse folder node content"""
try:
# For folders, the content is the metadata JSON
metadata = json.loads(folder_content)
return {
'uuid': node_uuid,
'hash': node_hash,
'name': metadata.get('visibleName', 'Unknown Folder'),
'node_type': 'folder',
'metadata': metadata,
'last_modified': metadata.get('lastModified', '0'),
'parent_uuid': metadata.get('parent', ''),
'sync_status': 'synced',
'last_synced': datetime.now().isoformat()
}
except Exception as e:
print(f"ā Failed to parse folder {node_uuid[:8]}...: {e}")
return None
def _parse_document_node(self, node_uuid: str, node_hash: str, doc_content: str, node_type: str) -> Dict[str, Any]:
"""Parse document node content (docSchema)"""
try:
# Parse docSchema to get components
lines = doc_content.strip().split('\n')
if len(lines) < 2:
print(f"ā ļø Invalid docSchema for {node_uuid[:8]}...")
return None
version = lines[0]
component_hashes = {}
metadata = None
# Extract component hashes and download components
for line in lines[1:]:
if ':' in line:
parts = line.split(':')
if len(parts) >= 3:
comp_hash = parts[0]
comp_name = parts[2]
# Download the component
try:
comp_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{comp_hash}")
comp_response.raise_for_status()
# Save component content locally
self._save_file_content(comp_hash, comp_response.content, comp_name)
# Store component hash
if comp_name.endswith('.metadata'):
component_hashes['metadata'] = comp_hash
# Parse metadata
try:
metadata = json.loads(comp_response.text)
except:
print(f"ā ļø Failed to parse metadata for {node_uuid[:8]}...")
elif comp_name.endswith('.content'):
component_hashes['content'] = comp_hash
elif comp_name.endswith('.pdf'):
component_hashes['pdf'] = comp_hash
elif comp_name.endswith('.pagedata'):
component_hashes['pagedata'] = comp_hash
elif comp_name.endswith('.rm'):
if 'rm_files' not in component_hashes:
component_hashes['rm_files'] = []
component_hashes['rm_files'].append(comp_hash)
except Exception as e:
print(f"ā ļø Failed to download component {comp_name}: {e}")
component_hashes['docSchema'] = node_hash
# Determine document name
doc_name = "Unknown Document"
if metadata:
doc_name = metadata.get('visibleName', doc_name)
return {
'uuid': node_uuid,
'hash': node_hash,
'name': doc_name,
'node_type': 'document',
'metadata': metadata or {},
'component_hashes': component_hashes,
'last_modified': metadata.get('lastModified', '0') if metadata else '0',
'parent_uuid': metadata.get('parent', '') if metadata else '',
'version': int(version) if version.isdigit() else 1,
'sync_status': 'synced',
'last_synced': datetime.now().isoformat()
}
except Exception as e:
print(f"ā Failed to parse document {node_uuid[:8]}...: {e}")
return None
def _fetch_metadata(self, metadata_hash: str) -> Optional[Dict[str, Any]]:
"""Fetch and parse document metadata"""
try:
metadata_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}")
metadata_response.raise_for_status()
return json.loads(metadata_response.text)
except Exception as e:
print(f"ā ļø Failed to fetch metadata {metadata_hash[:8]}...: {e}")
return None
def _save_summary(self):
"""Save human-readable summary"""
try:
with open(self.summary_path, 'w') as f:
f.write("reMarkable Replica Summary\n")
f.write("=" * 50 + "\n\n")
f.write(f"Last sync: {self.database['metadata'].get('last_sync', 'Never')}\n")
f.write(f"Total syncs: {self.database['metadata'].get('sync_count', 0)}\n")
f.write(f"Total nodes: {len(self.database['nodes'])}\n\n")
# Count by type
folders = [n for n in self.database['nodes'].values() if n.get('node_type') == 'folder']
documents = [n for n in self.database['nodes'].values() if n.get('node_type') == 'document']
f.write(f"š Folders: {len(folders)}\n")
f.write(f"š Documents: {len(documents)}\n\n")
# List folders
if folders:
f.write("Folders:\n")
f.write("-" * 20 + "\n")
for folder in sorted(folders, key=lambda x: x.get('name', '')):
f.write(f" š {folder['name']} ({folder['uuid'][:8]}...)\n")
f.write("\n")
# List documents
if documents:
f.write("Documents:\n")
f.write("-" * 20 + "\n")
for doc in sorted(documents, key=lambda x: x.get('name', '')):
parent_info = ""
if doc.get('parent_uuid'):
parent_name = "Unknown Folder"
for folder in folders:
if folder['uuid'] == doc['parent_uuid']:
parent_name = folder['name']
break
parent_info = f" [in {parent_name}]"
f.write(f" š {doc['name']} ({doc['uuid'][:8]}...){parent_info}\n")
except Exception as e:
print(f"ā ļø Failed to save summary: {e}")
def _create_content_index(self):
"""Create an index of all downloaded content files"""
try:
index_path = self.replica_dir / "content_index.txt"
with open(index_path, 'w') as f:
f.write("reMarkable Content Index\n")
f.write("=" * 50 + "\n\n")
f.write(f"Generated: {datetime.now().isoformat()}\n")
f.write(f"Total files: {len(self.database.get('hash_registry', {}))}\n\n")
# Group by file type
by_type = {}
for hash_val, info in self.database.get('hash_registry', {}).items():
file_type = info.get('type', 'unknown')
if file_type not in by_type:
by_type[file_type] = []
by_type[file_type].append((hash_val, info))
for file_type, files in sorted(by_type.items()):
f.write(f"{file_type.upper()} Files ({len(files)}):\n")
f.write("-" * 30 + "\n")
for hash_val, info in sorted(files, key=lambda x: x[1].get('filename', '')):
filename = info.get('filename', 'unknown')
size = info.get('size', 0)
subdir = hash_val[:2]
f.write(f" {filename} ({size} bytes)\n")
f.write(f" Hash: {hash_val}\n")
f.write(f" Path: content/{subdir}/{hash_val}\n\n")
f.write("\n")
print(f"š Content index saved: {index_path}")
except Exception as e:
print(f"ā ļø Failed to create content index: {e}")
def get_content_file_path(self, content_hash: str) -> Path:
"""Get the local path for a content file"""
subdir = content_hash[:2]
return self.content_dir / subdir / content_hash
def _create_folder_structure(self):
"""Create readable folder structure with documents in their proper folders"""
try:
# Create documents directory
documents_dir = self.replica_dir / "documents"
documents_dir.mkdir(exist_ok=True)
print(f"š Creating folder structure in: {documents_dir}")
# Build folder hierarchy
folders = self.get_folders()
documents = [n for n in self.database['nodes'].values() if n.get('node_type') == 'document']
# Create folder directories
folder_paths = {}
# Process root folders first
for folder in folders:
if not folder.get('parent_uuid'):
folder_path = documents_dir / self._sanitize_filename(folder['name'])
folder_path.mkdir(exist_ok=True)
folder_paths[folder['uuid']] = folder_path
print(f" š Created root folder: {folder['name']}")
# Process nested folders
remaining_folders = [f for f in folders if f.get('parent_uuid')]
max_iterations = 10 # Prevent infinite loops
while remaining_folders and max_iterations > 0:
processed_this_round = []
for folder in remaining_folders:
parent_uuid = folder.get('parent_uuid')
if parent_uuid in folder_paths:
# Parent folder exists, create this folder
parent_path = folder_paths[parent_uuid]
folder_path = parent_path / self._sanitize_filename(folder['name'])
folder_path.mkdir(exist_ok=True)
folder_paths[folder['uuid']] = folder_path
processed_this_round.append(folder)
print(f" š Created nested folder: {folder['name']}")
# Remove processed folders
for folder in processed_this_round:
remaining_folders.remove(folder)
max_iterations -= 1
# Extract documents to their folders
for doc in documents:
doc_name = self._sanitize_filename(doc['name'])
parent_uuid = doc.get('parent_uuid')
# Determine target directory
if parent_uuid and parent_uuid in folder_paths:
target_dir = folder_paths[parent_uuid]
else:
target_dir = documents_dir
# Extract PDF if available
pdf_hash = doc.get('component_hashes', {}).get('pdf')
if pdf_hash:
pdf_path = target_dir / f"{doc_name}.pdf"
source_path = self.get_content_file_path(pdf_hash)
if source_path.exists():
try:
# Copy PDF to folder structure
import shutil
shutil.copy2(source_path, pdf_path)
print(f" š Extracted PDF: {doc_name}.pdf")
except Exception as e:
print(f" ā Failed to copy PDF {doc_name}: {e}")
else:
print(f" ā ļø PDF source not found: {pdf_hash[:16]}...")
# For notebooks (with .rm files), create a note that it's a notebook
rm_files = doc.get('component_hashes', {}).get('rm_files', [])
if rm_files and not pdf_hash:
notebook_info_path = target_dir / f"{doc_name}_notebook_info.txt"
try:
with open(notebook_info_path, 'w') as f:
f.write(f"reMarkable Notebook: {doc['name']}\n")
f.write(f"UUID: {doc['uuid']}\n")
f.write(f"Created: {doc.get('metadata', {}).get('lastModified', 'Unknown')}\n")
f.write(f"Pages: {len(rm_files)}\n\n")
f.write("This is a reMarkable notebook with handwritten content.\n")
f.write("The original .rm files are stored in the content directory.\n")
print(f" š Created notebook info: {doc_name}_notebook_info.txt")
except Exception as e:
print(f" ā Failed to create notebook info: {e}")
print(f"ā
Folder structure created successfully")
except Exception as e:
print(f"ā Failed to create folder structure: {e}")
def _sanitize_filename(self, filename: str) -> str:
"""Sanitize filename for filesystem use"""
# Remove or replace invalid characters
import re
sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename)
sanitized = sanitized.strip('. ')
# Ensure it's not empty
if not sanitized:
sanitized = "unnamed"
# Limit length
if len(sanitized) > 200:
sanitized = sanitized[:200]
return sanitized
def get_node_by_uuid(self, uuid: str) -> Optional[Dict[str, Any]]:
"""Get a specific node by UUID"""
return self.database['nodes'].get(uuid)
def get_documents_in_folder(self, folder_uuid: str) -> List[Dict[str, Any]]:
"""Get all documents in a specific folder"""
return [
node for node in self.database['nodes'].values()
if node.get('node_type') == 'document' and node.get('parent_uuid') == folder_uuid
]
def get_folders(self) -> List[Dict[str, Any]]:
"""Get all folders"""
return [
node for node in self.database['nodes'].values()
if node.get('node_type') == 'folder'
]
def get_root_documents(self) -> List[Dict[str, Any]]:
"""Get all documents in root (no parent)"""
return [
node for node in self.database['nodes'].values()
if node.get('node_type') == 'document' and not node.get('parent_uuid')
]
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, workspace_dir)
Purpose: Internal method: init
Parameters:
workspace_dir: Type: str
Returns: None
setup_logging(self)
Purpose: Setup logging to file
Returns: None
_authenticate(self) -> Optional[requests.Session]
Purpose: Authenticate with the reMarkable cloud service using token-based approach
Returns: Returns Optional[requests.Session]
_get_new_token(self) -> Optional[requests.Session]
Purpose: Get a new authentication token
Returns: Returns Optional[requests.Session]
sync_replica(self) -> bool
Purpose: Perform replica synchronization using the proven 3-step process: 1. Discovery - Get all nodes from cloud 2. Hierarchy - Build proper folder structure 3. Extraction - Download content to correct locations
Returns: Returns bool
_load_database(self) -> Dict[str, Any]
Purpose: Load or create replica database
Returns: Returns Dict[str, Any]
_save_database(self)
Purpose: Save database to disk
Returns: None
_save_file_content(self, content_hash, content, filename) -> bool
Purpose: Save file content to local content directory
Parameters:
content_hash: Type: strcontent: Type: bytesfilename: Type: str
Returns: Returns bool
_get_file_type(self, filename) -> str
Purpose: Determine file type from filename
Parameters:
filename: Type: str
Returns: Returns str
_compute_hash(self, content) -> str
Purpose: Compute SHA256 hash of content
Parameters:
content: Type: bytes
Returns: Returns str
sync_complete_replica(self) -> bool
Purpose: Perform complete replica synchronization
Returns: Returns bool
_fetch_node_details(self, node_uuid, node_hash, node_type, node_size) -> Optional[Dict[str, Any]]
Purpose: Fetch detailed information about a node
Parameters:
node_uuid: Type: strnode_hash: Type: strnode_type: Type: strnode_size: Type: str
Returns: Returns Optional[Dict[str, Any]]
_parse_folder_node(self, node_uuid, node_hash, folder_content) -> Dict[str, Any]
Purpose: Parse folder node content
Parameters:
node_uuid: Type: strnode_hash: Type: strfolder_content: Type: str
Returns: Returns Dict[str, Any]
_parse_document_node(self, node_uuid, node_hash, doc_content, node_type) -> Dict[str, Any]
Purpose: Parse document node content (docSchema)
Parameters:
node_uuid: Type: strnode_hash: Type: strdoc_content: Type: strnode_type: Type: str
Returns: Returns Dict[str, Any]
_fetch_metadata(self, metadata_hash) -> Optional[Dict[str, Any]]
Purpose: Fetch and parse document metadata
Parameters:
metadata_hash: Type: str
Returns: Returns Optional[Dict[str, Any]]
_save_summary(self)
Purpose: Save human-readable summary
Returns: None
_create_content_index(self)
Purpose: Create an index of all downloaded content files
Returns: None
get_content_file_path(self, content_hash) -> Path
Purpose: Get the local path for a content file
Parameters:
content_hash: Type: str
Returns: Returns Path
_create_folder_structure(self)
Purpose: Create readable folder structure with documents in their proper folders
Returns: None
_sanitize_filename(self, filename) -> str
Purpose: Sanitize filename for filesystem use
Parameters:
filename: Type: str
Returns: Returns str
get_node_by_uuid(self, uuid) -> Optional[Dict[str, Any]]
Purpose: Get a specific node by UUID
Parameters:
uuid: Type: str
Returns: Returns Optional[Dict[str, Any]]
get_documents_in_folder(self, folder_uuid) -> List[Dict[str, Any]]
Purpose: Get all documents in a specific folder
Parameters:
folder_uuid: Type: str
Returns: Returns List[Dict[str, Any]]
get_folders(self) -> List[Dict[str, Any]]
Purpose: Get all folders
Returns: Returns List[Dict[str, Any]]
get_root_documents(self) -> List[Dict[str, Any]]
Purpose: Get all documents in root (no parent)
Returns: Returns List[Dict[str, Any]]
Required Imports
import os
import sys
import json
import time
import hashlib
Usage Example
# Example usage:
# result = RemarkableReplicaSync(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class RemarkableReplicaSync 71.1% similar
-
class RemarkableLocalReplica 64.2% similar
-
class RemarkableReplicaBuilder 63.8% similar
-
function main_v81 56.5% similar
-
function main_v60 55.8% similar