šŸ” Code Extractor

class RemarkableReplicaSync_v1

Maturity: 26

Standalone replica synchronization using proven local_replica_v2 approach

File:
/tf/active/vicechatdev/e-ink-llm/cloudtest/sync_replica.py
Lines:
59 - 824
Complexity:
moderate

Purpose

Standalone replica synchronization using proven local_replica_v2 approach

Source Code

class RemarkableReplicaSync:
    """Standalone replica synchronization using proven local_replica_v2 approach"""
    
    def __init__(self, workspace_dir: str = None):
        self.workspace_dir = Path(workspace_dir) if workspace_dir else Path(__file__).parent
        self.replica_dir = self.workspace_dir / "remarkable_replica_v2"
        self.content_dir = self.replica_dir / "content"
        
        # Create directories
        for directory in [self.replica_dir, self.content_dir]:
            directory.mkdir(parents=True, exist_ok=True)
        
        # Setup logging
        self.log_file = self.replica_dir / "build.log"
        self.setup_logging()
        
        # Initialize authentication
        self.session = self._authenticate()
        if not self.session:
            raise RuntimeError("Failed to authenticate with reMarkable")
        
        # State matching local_replica_v2.py
        self.nodes: Dict[str, RemarkableNode] = {}
        self.all_hashes: Set[str] = set()
        self.failed_downloads: Set[str] = set()
        
        # Statistics
        self.stats = {
            'total_nodes': 0,
            'folders': 0,
            'documents': 0,
            'pdfs_extracted': 0,
            'rm_files_extracted': 0,
            'rm_pdfs_converted': 0,
            'nodes_added': 0
        }
        
    def setup_logging(self):
        """Setup logging to file"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(self.log_file, mode='w'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def _authenticate(self) -> Optional[requests.Session]:
        """Authenticate with the reMarkable cloud service using token-based approach"""
        token_file = self.workspace_dir / '.remarkable_token'
        
        if token_file.exists():
            print("ļæ½ Using existing reMarkable token...")
            try:
                with open(token_file, 'r') as f:
                    token_data = json.load(f)
                
                session = requests.Session()
                session.headers.update({
                    'Authorization': f'Bearer {token_data["access_token"]}',
                    'User-Agent': 'remarkable-replica-sync/1.0'
                })
                
                # Quick test - try to get document root
                test_url = f'{token_data["service_manager_url"]}/document-storage/json/2/docs'
                response = session.get(test_url)
                
                if response.status_code == 200:
                    print("āœ… Authentication successful")
                    return session
                else:
                    print(f"āŒ Token test failed with status {response.status_code}")
                    
            except Exception as e:
                print(f"āŒ Token authentication failed: {e}")
        
        # Need new token
        print("šŸ” No valid token found. Getting new token...")
        return self._get_new_token()
    
    def _get_new_token(self) -> Optional[requests.Session]:
        """Get a new authentication token"""
        device_token = '9c4e7c2b-c6c7-4831-8b2a-3f5a2e8f9c3d'
        
        try:
            # Step 1: Register device
            register_url = 'https://webapp-production-dot-remarkable-production.appspot.com/token/json/2/device/new'
            register_data = {
                'code': device_token,
                'deviceDesc': 'desktop-linux',
                'deviceID': hashlib.sha256(f"replica-sync-{int(time.time())}".encode()).hexdigest()[:8]
            }
            
            response = requests.post(register_url, json=register_data)
            if response.status_code != 200:
                print(f"āŒ Device registration failed: {response.status_code}")
                return None
                
            device_bearer = response.text.strip('"')
            
            # Step 2: Get user token
            user_url = 'https://webapp-production-dot-remarkable-production.appspot.com/token/json/2/user/new'
            user_response = requests.post(
                user_url,
                headers={'Authorization': f'Bearer {device_bearer}'}
            )
            
            if user_response.status_code != 200:
                print(f"āŒ User token failed: {user_response.status_code}")
                return None
                
            user_token = user_response.text.strip('"')
            
            # Step 3: Get service discovery
            discovery_url = 'https://service-manager-production-dot-remarkable-production.appspot.com/service/json/1/document-storage?environment=production&group=auth0%7C5a68dc51cb30df3877a1d7c4&apiVer=2'
            discovery_response = requests.get(
                discovery_url,
                headers={'Authorization': f'Bearer {user_token}'}
            )
            
            if discovery_response.status_code != 200:
                print(f"āŒ Service discovery failed: {discovery_response.status_code}")
                return None
                
            service_info = discovery_response.json()
            service_url = service_info.get('Host')
            
            if not service_url:
                print("āŒ No service URL in discovery response")
                return None
            
            # Save token info
            token_data = {
                'access_token': user_token,
                'service_manager_url': service_url,
                'created_at': datetime.now().isoformat()
            }
            
            token_file = self.workspace_dir / '.remarkable_token'
            with open(token_file, 'w') as f:
                json.dump(token_data, f, indent=2)
            
            # Create session
            session = requests.Session()
            session.headers.update({
                'Authorization': f'Bearer {user_token}',
                'User-Agent': 'remarkable-replica-sync/1.0'
            })
            
            print("āœ… New authentication token obtained and saved")
            return session
            
        except Exception as e:
            print(f"āŒ Authentication failed: {e}")
            return None
            
            return None
    
    def sync_replica(self) -> bool:
        """
        Perform replica synchronization using the proven 3-step process:
        1. Discovery - Get all nodes from cloud
        2. Hierarchy - Build proper folder structure
        3. Extraction - Download content to correct locations
        """
        try:
            self.logger.info("šŸš€ Starting reMarkable replica sync")
            
            # Phase 1: Discovery
            if not self._discover_all_nodes():
                self.logger.error("āŒ Discovery phase failed")
                return False
            
            # Phase 2: Build hierarchy 
            if not self._build_folder_hierarchy():
                self.logger.error("āŒ Hierarchy phase failed")
                return False
            
            # Phase 3: Extract content
            if not self._extract_content():
                self.logger.error("āŒ Content extraction phase failed")
                return False
            
            # Generate summary
            self._generate_summary()
            
            self.logger.info("āœ… Replica sync completed successfully")
            return True
            
        except Exception as e:
            self.logger.error(f"āŒ Sync failed: {e}")
            return False
    
    def _load_database(self) -> Dict[str, Any]:
        """Load or create replica database"""
        if self.database_path.exists():
            try:
                with open(self.database_path, 'r') as f:
                    db = json.load(f)
                print(f"šŸ“‚ Loaded existing database with {len(db.get('nodes', {}))} nodes")
                return db
            except Exception as e:
                print(f"āš ļø Database corrupted, creating new: {e}")
        
        # Create new database
        db = {
            'nodes': {},
            'hash_registry': {},
            'metadata': {
                'last_sync': None,
                'sync_count': 0,
                'created': datetime.now().isoformat()
            }
        }
        print("šŸ“‚ Created new replica database")
        return db
    
    def _save_database(self):
        """Save database to disk"""
        try:
            with open(self.database_path, 'w') as f:
                json.dump(self.database, f, indent=2, default=str)
            
            # Update metadata
            self.database['metadata']['last_sync'] = datetime.now().isoformat()
            self.database['metadata']['sync_count'] += 1
            
        except Exception as e:
            print(f"āŒ Failed to save database: {e}")
    
    def _save_file_content(self, content_hash: str, content: bytes, filename: str) -> bool:
        """Save file content to local content directory"""
        try:
            # Create file path using hash (first 2 chars as subdirectory)
            subdir = content_hash[:2]
            file_dir = self.content_dir / subdir
            file_dir.mkdir(exist_ok=True)
            
            file_path = file_dir / content_hash
            
            # Only save if file doesn't exist (avoid re-downloading)
            if not file_path.exists():
                with open(file_path, 'wb') as f:
                    f.write(content)
                print(f"   šŸ’¾ Saved {filename} ({len(content)} bytes)")
            
            # Register in hash registry
            if content_hash not in self.database.get('hash_registry', {}):
                if 'hash_registry' not in self.database:
                    self.database['hash_registry'] = {}
                
                self.database['hash_registry'][content_hash] = {
                    'filename': filename,
                    'size': len(content),
                    'type': self._get_file_type(filename),
                    'downloaded': datetime.now().isoformat()
                }
            
            return True
            
        except Exception as e:
            print(f"āŒ Failed to save {filename}: {e}")
            return False
    
    def _get_file_type(self, filename: str) -> str:
        """Determine file type from filename"""
        if filename.endswith('.pdf'):
            return 'pdf'
        elif filename.endswith('.metadata'):
            return 'metadata'
        elif filename.endswith('.content'):
            return 'content'
        elif filename.endswith('.pagedata'):
            return 'pagedata'
        elif filename.endswith('.rm'):
            return 'notebook_page'
        elif filename.endswith('.docSchema'):
            return 'docschema'
        else:
            return 'unknown'

    def _compute_hash(self, content: bytes) -> str:
        """Compute SHA256 hash of content"""
        return hashlib.sha256(content).hexdigest()
    
    def sync_complete_replica(self) -> bool:
        """Perform complete replica synchronization"""
        try:
            print("\nšŸš€ STARTING COMPLETE REPLICA SYNC")
            print("=" * 50)
            
            # Step 1: Get current root state
            print("šŸ“‹ Step 1: Getting root state from server...")
            root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
            root_response.raise_for_status()
            root_data = root_response.json()
            
            current_root_hash = root_data['hash']
            current_generation = root_data['generation']
            
            print(f"🌱 Root hash: {current_root_hash}")
            print(f"šŸ”¢ Generation: {current_generation}")
            
            # Step 2: Fetch root.docSchema
            print("šŸ“‹ Step 2: Fetching root.docSchema...")
            root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{current_root_hash}")
            root_content_response.raise_for_status()
            root_content = root_content_response.text
            
            print(f"šŸ“„ Root.docSchema size: {len(root_content)} bytes")
            
            # Step 3: Parse and discover all nodes
            print("šŸ“‹ Step 3: Discovering all nodes...")
            discovered_nodes = {}
            
            lines = root_content.strip().split('\n')
            if len(lines) < 1:
                print("āŒ Empty root.docSchema")
                return False
            
            version = lines[0]
            print(f"šŸ“Š Schema version: {version}")
            
            # Process each entry in root.docSchema
            for line_num, line in enumerate(lines[1:], 1):
                if ':' in line:
                    parts = line.split(':')
                    if len(parts) >= 5:
                        node_hash = parts[0]
                        node_uuid = parts[2]
                        node_type = parts[3]
                        node_size = parts[4]
                        
                        # Fetch node details
                        node_info = self._fetch_node_details(node_uuid, node_hash, node_type, node_size)
                        if node_info:
                            discovered_nodes[node_uuid] = node_info
                            
                        # Progress indicator
                        if line_num % 5 == 0:
                            print(f"   šŸ“Š Processed {line_num}/{len(lines)-1} entries...")
            
            print(f"āœ… Discovered {len(discovered_nodes)} nodes")
            
            # Step 4: Update database
            print("šŸ“‹ Step 4: Updating database...")
            
            # Count changes
            new_nodes = 0
            updated_nodes = 0
            
            for uuid, node_info in discovered_nodes.items():
                if uuid not in self.database['nodes']:
                    new_nodes += 1
                else:
                    # Check if updated
                    existing_node = self.database['nodes'][uuid]
                    if existing_node.get('hash') != node_info.get('hash'):
                        updated_nodes += 1
                
                self.database['nodes'][uuid] = node_info
            
            # Remove nodes no longer in cloud
            cloud_uuids = set(discovered_nodes.keys())
            local_uuids = set(self.database['nodes'].keys())
            removed_uuids = local_uuids - cloud_uuids
            
            for uuid in removed_uuids:
                del self.database['nodes'][uuid]
            
            print(f"šŸ“Š Database changes:")
            print(f"   šŸ†• New nodes: {new_nodes}")
            print(f"   šŸ”„ Updated nodes: {updated_nodes}")
            print(f"   šŸ—‘ļø Removed nodes: {len(removed_uuids)}")
            
            # Step 5: Save database and summary
            print("šŸ“‹ Step 5: Saving database and summary...")
            self._save_database()
            self._save_summary()
            
            # Step 6: Create content index
            print("šŸ“‹ Step 6: Creating content index...")
            self._create_content_index()
            
            # Step 7: Create folder structure with files
            print("šŸ“‹ Step 7: Creating folder structure...")
            self._create_folder_structure()
            
            print(f"\nšŸŽ‰ REPLICA SYNC COMPLETED!")
            print(f"šŸ“Š Total nodes: {len(self.database['nodes'])}")
            print(f"šŸ“ Database: {self.database_path}")
            print(f"šŸ“„ Summary: {self.summary_path}")
            print(f"šŸ’¾ Content files: {self.content_dir}")
            print(f"šŸ“‚ Folder structure: {self.replica_dir / 'documents'}")
            
            return True
            
        except Exception as e:
            print(f"āŒ Replica sync failed: {e}")
            return False
    
    def _fetch_node_details(self, node_uuid: str, node_hash: str, node_type: str, node_size: str) -> Optional[Dict[str, Any]]:
        """Fetch detailed information about a node"""
        try:
            # Fetch node content (docSchema or metadata)
            node_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{node_hash}")
            node_response.raise_for_status()
            node_content = node_response.text
            node_content_bytes = node_response.content
            
            # Save the node content locally
            if node_type in ['1', '2']:  # Folder
                self._save_file_content(node_hash, node_content_bytes, f"{node_uuid}.metadata")
            else:  # Document
                self._save_file_content(node_hash, node_content_bytes, f"{node_uuid}.docSchema")
            
            # Determine node type and parse
            if node_type in ['1', '2']:  # Folder
                return self._parse_folder_node(node_uuid, node_hash, node_content)
            elif node_type in ['3', '4']:  # Document
                return self._parse_document_node(node_uuid, node_hash, node_content, node_type)
            else:
                print(f"āš ļø Unknown node type {node_type} for {node_uuid[:8]}...")
                return None
                
        except Exception as e:
            print(f"āŒ Failed to fetch node {node_uuid[:8]}...: {e}")
            return None
    
    def _parse_folder_node(self, node_uuid: str, node_hash: str, folder_content: str) -> Dict[str, Any]:
        """Parse folder node content"""
        try:
            # For folders, the content is the metadata JSON
            metadata = json.loads(folder_content)
            
            return {
                'uuid': node_uuid,
                'hash': node_hash,
                'name': metadata.get('visibleName', 'Unknown Folder'),
                'node_type': 'folder',
                'metadata': metadata,
                'last_modified': metadata.get('lastModified', '0'),
                'parent_uuid': metadata.get('parent', ''),
                'sync_status': 'synced',
                'last_synced': datetime.now().isoformat()
            }
            
        except Exception as e:
            print(f"āŒ Failed to parse folder {node_uuid[:8]}...: {e}")
            return None
    
    def _parse_document_node(self, node_uuid: str, node_hash: str, doc_content: str, node_type: str) -> Dict[str, Any]:
        """Parse document node content (docSchema)"""
        try:
            # Parse docSchema to get components
            lines = doc_content.strip().split('\n')
            if len(lines) < 2:
                print(f"āš ļø Invalid docSchema for {node_uuid[:8]}...")
                return None
            
            version = lines[0]
            component_hashes = {}
            metadata = None
            
            # Extract component hashes and download components
            for line in lines[1:]:
                if ':' in line:
                    parts = line.split(':')
                    if len(parts) >= 3:
                        comp_hash = parts[0]
                        comp_name = parts[2]
                        
                        # Download the component
                        try:
                            comp_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{comp_hash}")
                            comp_response.raise_for_status()
                            
                            # Save component content locally
                            self._save_file_content(comp_hash, comp_response.content, comp_name)
                            
                            # Store component hash
                            if comp_name.endswith('.metadata'):
                                component_hashes['metadata'] = comp_hash
                                # Parse metadata
                                try:
                                    metadata = json.loads(comp_response.text)
                                except:
                                    print(f"āš ļø Failed to parse metadata for {node_uuid[:8]}...")
                            elif comp_name.endswith('.content'):
                                component_hashes['content'] = comp_hash
                            elif comp_name.endswith('.pdf'):
                                component_hashes['pdf'] = comp_hash
                            elif comp_name.endswith('.pagedata'):
                                component_hashes['pagedata'] = comp_hash
                            elif comp_name.endswith('.rm'):
                                if 'rm_files' not in component_hashes:
                                    component_hashes['rm_files'] = []
                                component_hashes['rm_files'].append(comp_hash)
                                
                        except Exception as e:
                            print(f"āš ļø Failed to download component {comp_name}: {e}")
                        
                        component_hashes['docSchema'] = node_hash
            
            # Determine document name
            doc_name = "Unknown Document"
            if metadata:
                doc_name = metadata.get('visibleName', doc_name)
            
            return {
                'uuid': node_uuid,
                'hash': node_hash,
                'name': doc_name,
                'node_type': 'document',
                'metadata': metadata or {},
                'component_hashes': component_hashes,
                'last_modified': metadata.get('lastModified', '0') if metadata else '0',
                'parent_uuid': metadata.get('parent', '') if metadata else '',
                'version': int(version) if version.isdigit() else 1,
                'sync_status': 'synced',
                'last_synced': datetime.now().isoformat()
            }
            
        except Exception as e:
            print(f"āŒ Failed to parse document {node_uuid[:8]}...: {e}")
            return None
    
    def _fetch_metadata(self, metadata_hash: str) -> Optional[Dict[str, Any]]:
        """Fetch and parse document metadata"""
        try:
            metadata_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{metadata_hash}")
            metadata_response.raise_for_status()
            return json.loads(metadata_response.text)
        except Exception as e:
            print(f"āš ļø Failed to fetch metadata {metadata_hash[:8]}...: {e}")
            return None
    
    def _save_summary(self):
        """Save human-readable summary"""
        try:
            with open(self.summary_path, 'w') as f:
                f.write("reMarkable Replica Summary\n")
                f.write("=" * 50 + "\n\n")
                
                f.write(f"Last sync: {self.database['metadata'].get('last_sync', 'Never')}\n")
                f.write(f"Total syncs: {self.database['metadata'].get('sync_count', 0)}\n")
                f.write(f"Total nodes: {len(self.database['nodes'])}\n\n")
                
                # Count by type
                folders = [n for n in self.database['nodes'].values() if n.get('node_type') == 'folder']
                documents = [n for n in self.database['nodes'].values() if n.get('node_type') == 'document']
                
                f.write(f"šŸ“‚ Folders: {len(folders)}\n")
                f.write(f"šŸ“„ Documents: {len(documents)}\n\n")
                
                # List folders
                if folders:
                    f.write("Folders:\n")
                    f.write("-" * 20 + "\n")
                    for folder in sorted(folders, key=lambda x: x.get('name', '')):
                        f.write(f"  šŸ“‚ {folder['name']} ({folder['uuid'][:8]}...)\n")
                    f.write("\n")
                
                # List documents
                if documents:
                    f.write("Documents:\n")
                    f.write("-" * 20 + "\n")
                    for doc in sorted(documents, key=lambda x: x.get('name', '')):
                        parent_info = ""
                        if doc.get('parent_uuid'):
                            parent_name = "Unknown Folder"
                            for folder in folders:
                                if folder['uuid'] == doc['parent_uuid']:
                                    parent_name = folder['name']
                                    break
                            parent_info = f" [in {parent_name}]"
                        
                        f.write(f"  šŸ“„ {doc['name']} ({doc['uuid'][:8]}...){parent_info}\n")
                
        except Exception as e:
            print(f"āš ļø Failed to save summary: {e}")
    
    def _create_content_index(self):
        """Create an index of all downloaded content files"""
        try:
            index_path = self.replica_dir / "content_index.txt"
            
            with open(index_path, 'w') as f:
                f.write("reMarkable Content Index\n")
                f.write("=" * 50 + "\n\n")
                
                f.write(f"Generated: {datetime.now().isoformat()}\n")
                f.write(f"Total files: {len(self.database.get('hash_registry', {}))}\n\n")
                
                # Group by file type
                by_type = {}
                for hash_val, info in self.database.get('hash_registry', {}).items():
                    file_type = info.get('type', 'unknown')
                    if file_type not in by_type:
                        by_type[file_type] = []
                    by_type[file_type].append((hash_val, info))
                
                for file_type, files in sorted(by_type.items()):
                    f.write(f"{file_type.upper()} Files ({len(files)}):\n")
                    f.write("-" * 30 + "\n")
                    
                    for hash_val, info in sorted(files, key=lambda x: x[1].get('filename', '')):
                        filename = info.get('filename', 'unknown')
                        size = info.get('size', 0)
                        subdir = hash_val[:2]
                        f.write(f"  {filename} ({size} bytes)\n")
                        f.write(f"    Hash: {hash_val}\n")
                        f.write(f"    Path: content/{subdir}/{hash_val}\n\n")
                    
                    f.write("\n")
            
            print(f"šŸ“‹ Content index saved: {index_path}")
            
        except Exception as e:
            print(f"āš ļø Failed to create content index: {e}")
    
    def get_content_file_path(self, content_hash: str) -> Path:
        """Get the local path for a content file"""
        subdir = content_hash[:2]
        return self.content_dir / subdir / content_hash
    
    def _create_folder_structure(self):
        """Create readable folder structure with documents in their proper folders"""
        try:
            # Create documents directory
            documents_dir = self.replica_dir / "documents"
            documents_dir.mkdir(exist_ok=True)
            
            print(f"šŸ“ Creating folder structure in: {documents_dir}")
            
            # Build folder hierarchy
            folders = self.get_folders()
            documents = [n for n in self.database['nodes'].values() if n.get('node_type') == 'document']
            
            # Create folder directories
            folder_paths = {}
            
            # Process root folders first
            for folder in folders:
                if not folder.get('parent_uuid'):
                    folder_path = documents_dir / self._sanitize_filename(folder['name'])
                    folder_path.mkdir(exist_ok=True)
                    folder_paths[folder['uuid']] = folder_path
                    print(f"   šŸ“‚ Created root folder: {folder['name']}")
            
            # Process nested folders
            remaining_folders = [f for f in folders if f.get('parent_uuid')]
            max_iterations = 10  # Prevent infinite loops
            
            while remaining_folders and max_iterations > 0:
                processed_this_round = []
                
                for folder in remaining_folders:
                    parent_uuid = folder.get('parent_uuid')
                    if parent_uuid in folder_paths:
                        # Parent folder exists, create this folder
                        parent_path = folder_paths[parent_uuid]
                        folder_path = parent_path / self._sanitize_filename(folder['name'])
                        folder_path.mkdir(exist_ok=True)
                        folder_paths[folder['uuid']] = folder_path
                        processed_this_round.append(folder)
                        print(f"   šŸ“‚ Created nested folder: {folder['name']}")
                
                # Remove processed folders
                for folder in processed_this_round:
                    remaining_folders.remove(folder)
                
                max_iterations -= 1
            
            # Extract documents to their folders
            for doc in documents:
                doc_name = self._sanitize_filename(doc['name'])
                parent_uuid = doc.get('parent_uuid')
                
                # Determine target directory
                if parent_uuid and parent_uuid in folder_paths:
                    target_dir = folder_paths[parent_uuid]
                else:
                    target_dir = documents_dir
                
                # Extract PDF if available
                pdf_hash = doc.get('component_hashes', {}).get('pdf')
                if pdf_hash:
                    pdf_path = target_dir / f"{doc_name}.pdf"
                    source_path = self.get_content_file_path(pdf_hash)
                    
                    if source_path.exists():
                        try:
                            # Copy PDF to folder structure
                            import shutil
                            shutil.copy2(source_path, pdf_path)
                            print(f"   šŸ“„ Extracted PDF: {doc_name}.pdf")
                        except Exception as e:
                            print(f"   āŒ Failed to copy PDF {doc_name}: {e}")
                    else:
                        print(f"   āš ļø PDF source not found: {pdf_hash[:16]}...")
                
                # For notebooks (with .rm files), create a note that it's a notebook
                rm_files = doc.get('component_hashes', {}).get('rm_files', [])
                if rm_files and not pdf_hash:
                    notebook_info_path = target_dir / f"{doc_name}_notebook_info.txt"
                    try:
                        with open(notebook_info_path, 'w') as f:
                            f.write(f"reMarkable Notebook: {doc['name']}\n")
                            f.write(f"UUID: {doc['uuid']}\n")
                            f.write(f"Created: {doc.get('metadata', {}).get('lastModified', 'Unknown')}\n")
                            f.write(f"Pages: {len(rm_files)}\n\n")
                            f.write("This is a reMarkable notebook with handwritten content.\n")
                            f.write("The original .rm files are stored in the content directory.\n")
                        print(f"   šŸ““ Created notebook info: {doc_name}_notebook_info.txt")
                    except Exception as e:
                        print(f"   āŒ Failed to create notebook info: {e}")
            
            print(f"āœ… Folder structure created successfully")
            
        except Exception as e:
            print(f"āŒ Failed to create folder structure: {e}")
    
    def _sanitize_filename(self, filename: str) -> str:
        """Sanitize filename for filesystem use"""
        # Remove or replace invalid characters
        import re
        sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename)
        sanitized = sanitized.strip('. ')
        
        # Ensure it's not empty
        if not sanitized:
            sanitized = "unnamed"
        
        # Limit length
        if len(sanitized) > 200:
            sanitized = sanitized[:200]
        
        return sanitized
    
    def get_node_by_uuid(self, uuid: str) -> Optional[Dict[str, Any]]:
        """Get a specific node by UUID"""
        return self.database['nodes'].get(uuid)
    
    def get_documents_in_folder(self, folder_uuid: str) -> List[Dict[str, Any]]:
        """Get all documents in a specific folder"""
        return [
            node for node in self.database['nodes'].values()
            if node.get('node_type') == 'document' and node.get('parent_uuid') == folder_uuid
        ]
    
    def get_folders(self) -> List[Dict[str, Any]]:
        """Get all folders"""
        return [
            node for node in self.database['nodes'].values()
            if node.get('node_type') == 'folder'
        ]
    
    def get_root_documents(self) -> List[Dict[str, Any]]:
        """Get all documents in root (no parent)"""
        return [
            node for node in self.database['nodes'].values()
            if node.get('node_type') == 'document' and not node.get('parent_uuid')
        ]

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, workspace_dir)

Purpose: Internal method: init

Parameters:

  • workspace_dir: Type: str

Returns: None

setup_logging(self)

Purpose: Setup logging to file

Returns: None

_authenticate(self) -> Optional[requests.Session]

Purpose: Authenticate with the reMarkable cloud service using token-based approach

Returns: Returns Optional[requests.Session]

_get_new_token(self) -> Optional[requests.Session]

Purpose: Get a new authentication token

Returns: Returns Optional[requests.Session]

sync_replica(self) -> bool

Purpose: Perform replica synchronization using the proven 3-step process: 1. Discovery - Get all nodes from cloud 2. Hierarchy - Build proper folder structure 3. Extraction - Download content to correct locations

Returns: Returns bool

_load_database(self) -> Dict[str, Any]

Purpose: Load or create replica database

Returns: Returns Dict[str, Any]

_save_database(self)

Purpose: Save database to disk

Returns: None

_save_file_content(self, content_hash, content, filename) -> bool

Purpose: Save file content to local content directory

Parameters:

  • content_hash: Type: str
  • content: Type: bytes
  • filename: Type: str

Returns: Returns bool

_get_file_type(self, filename) -> str

Purpose: Determine file type from filename

Parameters:

  • filename: Type: str

Returns: Returns str

_compute_hash(self, content) -> str

Purpose: Compute SHA256 hash of content

Parameters:

  • content: Type: bytes

Returns: Returns str

sync_complete_replica(self) -> bool

Purpose: Perform complete replica synchronization

Returns: Returns bool

_fetch_node_details(self, node_uuid, node_hash, node_type, node_size) -> Optional[Dict[str, Any]]

Purpose: Fetch detailed information about a node

Parameters:

  • node_uuid: Type: str
  • node_hash: Type: str
  • node_type: Type: str
  • node_size: Type: str

Returns: Returns Optional[Dict[str, Any]]

_parse_folder_node(self, node_uuid, node_hash, folder_content) -> Dict[str, Any]

Purpose: Parse folder node content

Parameters:

  • node_uuid: Type: str
  • node_hash: Type: str
  • folder_content: Type: str

Returns: Returns Dict[str, Any]

_parse_document_node(self, node_uuid, node_hash, doc_content, node_type) -> Dict[str, Any]

Purpose: Parse document node content (docSchema)

Parameters:

  • node_uuid: Type: str
  • node_hash: Type: str
  • doc_content: Type: str
  • node_type: Type: str

Returns: Returns Dict[str, Any]

_fetch_metadata(self, metadata_hash) -> Optional[Dict[str, Any]]

Purpose: Fetch and parse document metadata

Parameters:

  • metadata_hash: Type: str

Returns: Returns Optional[Dict[str, Any]]

_save_summary(self)

Purpose: Save human-readable summary

Returns: None

_create_content_index(self)

Purpose: Create an index of all downloaded content files

Returns: None

get_content_file_path(self, content_hash) -> Path

Purpose: Get the local path for a content file

Parameters:

  • content_hash: Type: str

Returns: Returns Path

_create_folder_structure(self)

Purpose: Create readable folder structure with documents in their proper folders

Returns: None

_sanitize_filename(self, filename) -> str

Purpose: Sanitize filename for filesystem use

Parameters:

  • filename: Type: str

Returns: Returns str

get_node_by_uuid(self, uuid) -> Optional[Dict[str, Any]]

Purpose: Get a specific node by UUID

Parameters:

  • uuid: Type: str

Returns: Returns Optional[Dict[str, Any]]

get_documents_in_folder(self, folder_uuid) -> List[Dict[str, Any]]

Purpose: Get all documents in a specific folder

Parameters:

  • folder_uuid: Type: str

Returns: Returns List[Dict[str, Any]]

get_folders(self) -> List[Dict[str, Any]]

Purpose: Get all folders

Returns: Returns List[Dict[str, Any]]

get_root_documents(self) -> List[Dict[str, Any]]

Purpose: Get all documents in root (no parent)

Returns: Returns List[Dict[str, Any]]

Required Imports

import os
import sys
import json
import time
import hashlib

Usage Example

# Example usage:
# result = RemarkableReplicaSync(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class RemarkableReplicaSync 71.1% similar

    A class that synchronizes reMarkable cloud documents to a local replica directory, downloading and organizing folders and documents in a hierarchical structure.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/sync_replica_new.py
  • class RemarkableLocalReplica 64.2% similar

    Builds and maintains a complete local replica of reMarkable cloud

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/local_replica.py
  • class RemarkableReplicaBuilder 63.8% similar

    Step-by-step replica builder

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/local_replica_v2.py
  • function main_v81 56.5% similar

    A test function that authenticates with the Remarkable cloud service and builds a complete local replica of the user's Remarkable data.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/local_replica.py
  • function main_v60 55.8% similar

    Main entry point function that orchestrates a standalone synchronization process for reMarkable Replica, handling initialization, execution, and error reporting.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/sync_replica_new.py
← Back to Browse