๐Ÿ” Code Extractor

class RemarkableDiscovery

Maturity: 26

Handles hierarchical discovery of reMarkable cloud content

File:
/tf/active/vicechatdev/e-ink-llm/cloudtest/discovery.py
Lines:
63 - 755
Complexity:
moderate

Purpose

Handles hierarchical discovery of reMarkable cloud content

Source Code

class RemarkableDiscovery:
    """Handles hierarchical discovery of reMarkable cloud content"""
    
    def __init__(self, session: requests.Session, output_dir: str = None):
        self.session = session
        self.base_url = "https://eu.tectonic.remarkable.com"
        
        # Setup output directory
        if output_dir:
            self.output_dir = Path(output_dir)
        else:
            self.output_dir = Path.cwd() / "remarkable_discovery"
        
        self.output_dir.mkdir(parents=True, exist_ok=True)
        
        # Setup detailed logging
        self.log_file = self.output_dir / "discovery_detailed.log"
        self.setup_logging()
        
        # Discovery state
        self.nodes: Dict[str, RemarkableNode] = {}
        self.root_nodes: List[RemarkableNode] = []
        self.failed_hashes: Set[str] = set()
        
        # Statistics
        self.stats = {
            'total_nodes': 0,
            'folders': 0,
            'documents': 0,
            'successful_downloads': 0,
            'failed_downloads': 0,
            'bytes_downloaded': 0
        }
    
    def setup_logging(self):
        """Setup detailed logging to file and console"""
        # Create a custom logger
        self.logger = logging.getLogger('RemarkableDiscovery')
        self.logger.setLevel(logging.DEBUG)
        
        # Clear any existing handlers
        self.logger.handlers.clear()
        
        # File handler for detailed logs
        file_handler = logging.FileHandler(self.log_file, mode='w', encoding='utf-8')
        file_handler.setLevel(logging.DEBUG)
        file_formatter = logging.Formatter(
            '%(asctime)s | %(levelname)-8s | %(message)s',
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        file_handler.setFormatter(file_formatter)
        
        # Console handler for important info
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        console_formatter = logging.Formatter('%(message)s')
        console_handler.setFormatter(console_formatter)
        
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
        
        self.logger.info(f"๐Ÿ” DETAILED DISCOVERY LOG STARTED")
        self.logger.info(f"๐Ÿ“ Output directory: {self.output_dir}")
        self.logger.info(f"๐Ÿ“ Log file: {self.log_file}")
    
    def log_node_details(self, node: RemarkableNode, content_info: Dict[str, Any], depth: int):
        """Log comprehensive details about a discovered node"""
        indent = "  " * depth
        
        # Basic node info
        self.logger.debug(f"{indent}NODE DISCOVERY DETAILS:")
        self.logger.debug(f"{indent}  Hash: {node.hash}")
        self.logger.debug(f"{indent}  Name: {node.name}")
        self.logger.debug(f"{indent}  Type: {node.node_type.value}")
        self.logger.debug(f"{indent}  Parent: {node.parent or 'ROOT'}")
        self.logger.debug(f"{indent}  Depth: {depth}")
        self.logger.debug(f"{indent}  Size: {node.size} bytes")
        
        # Content info details
        if content_info:
            self.logger.debug(f"{indent}  RAW CONTENT INFO:")
            self.logger.debug(f"{indent}    Type: {type(content_info)}")
            self.logger.debug(f"{indent}    Content: {json.dumps(content_info, indent=6, default=str)}")
        
        # Component details for documents
        if node.node_type != NodeType.FOLDER:
            if node.content_hash:
                self.logger.debug(f"{indent}    Content Hash: {node.content_hash}")
            if node.metadata_hash:
                self.logger.debug(f"{indent}    Metadata Hash: {node.metadata_hash}")
            if node.pagedata_hash:
                self.logger.debug(f"{indent}    Pagedata Hash: {node.pagedata_hash}")
            if node.pdf_hash:
                self.logger.debug(f"{indent}    PDF Hash: {node.pdf_hash}")
        
        self.logger.debug(f"{indent}  {'='*50}")
    
    def log_raw_content(self, hash_ref: str, raw_content: Any, content_type: str):
        """Log raw content before processing"""
        self.logger.debug(f"RAW CONTENT for {hash_ref[:16]}...:")
        self.logger.debug(f"  Type: {content_type}")
        self.logger.debug(f"  Size: {len(str(raw_content)) if raw_content else 'None'}")
        
        if content_type == "ZIP":
            self.logger.debug(f"  ZIP Content Details:")
            if hasattr(raw_content, 'namelist'):
                for name in raw_content.namelist():
                    self.logger.debug(f"    File: {name}")
        elif content_type == "TEXT":
            self.logger.debug(f"  TEXT Content (first 500 chars):")
            self.logger.debug(f"    {str(raw_content)[:500]}")
        elif content_type == "JSON":
            self.logger.debug(f"  JSON Content:")
            self.logger.debug(f"    {json.dumps(raw_content, indent=4, default=str)}")
        
        self.logger.debug(f"  {'='*60}")
    
    def get_root_hash(self) -> Optional[str]:
        """Get the current root hash from sync/v4/root endpoint"""
        url = f"{self.base_url}/sync/v4/root"
        
        try:
            print(f"๐Ÿ“ก Getting root hash from: {url}")
            response = self.session.get(url, timeout=30)
            
            if response.status_code == 200:
                data = response.json()
                root_hash = data.get('hash')
                generation = data.get('generation', 'unknown')
                
                print(f"โœ… Root hash obtained: {root_hash}")
                print(f"   Generation: {generation}")
                
                # Save root response for reference
                root_file = self.output_dir / "root_response.json"
                root_file.write_text(json.dumps(data, indent=2))
                
                return root_hash
            else:
                print(f"โŒ Root hash request failed: {response.status_code}")
                return None
                
        except Exception as e:
            print(f"โŒ Error getting root hash: {e}")
            return None
    
    def fetch_hash_content(self, hash_ref: str) -> Optional[Dict[str, Any]]:
        """Fetch and analyze content for a given hash"""
        url = f"{self.base_url}/sync/v3/files/{hash_ref}"
        
        try:
            self.logger.debug(f"FETCHING CONTENT: {hash_ref}")
            self.logger.debug(f"  URL: {url}")
            
            response = self.session.get(url, timeout=30)
            
            if response.status_code != 200:
                error_msg = f"โŒ Failed to fetch {hash_ref[:16]}...: HTTP {response.status_code}"
                print(error_msg)
                self.logger.error(error_msg)
                self.failed_hashes.add(hash_ref)
                self.stats['failed_downloads'] += 1
                return None
            
            content = response.content
            self.stats['successful_downloads'] += 1
            self.stats['bytes_downloaded'] += len(content)
            
            self.logger.debug(f"  Response size: {len(content)} bytes")
            self.logger.debug(f"  Content-Type: {response.headers.get('content-type', 'unknown')}")
            
            # Save raw content
            raw_file = self.output_dir / f"raw_{hash_ref[:16]}.bin"
            raw_file.write_bytes(content)
            
            # Analyze content
            content_info = {
                'hash': hash_ref,
                'size': len(content),
                'content': content,
                'is_directory': False,
                'hash_references': [],
                'metadata': None
            }
            
            # Content type detection and logging
            if len(content) == 0:
                self.log_raw_content(hash_ref, content, "EMPTY")
                
            elif content.startswith(b'PK'):
                # ZIP archive - likely contains file data
                self.log_raw_content(hash_ref, "ZIP archive", "ZIP")
                content_info.update(self.parse_zip_content(content))
                
            elif content.startswith(b'%PDF'):
                # PDF document
                self.log_raw_content(hash_ref, "PDF document", "PDF")
                
            else:
                try:
                    # Try to parse as text/directory listing
                    text_content = content.decode('utf-8')
                    self.log_raw_content(hash_ref, text_content, "TEXT")
                    content_info.update(self.parse_directory_listing(text_content))
                except UnicodeDecodeError:
                    # Binary content - no parsing
                    self.log_raw_content(hash_ref, content, "BINARY")
            
            self.logger.debug(f"  Parsed content_info: {json.dumps({k: v for k, v in content_info.items() if k != 'content'}, indent=4, default=str)}")
            
            return content_info
            
        except Exception as e:
            print(f"โŒ Error fetching {hash_ref[:16]}...: {e}")
            self.failed_hashes.add(hash_ref)
            self.stats['failed_downloads'] += 1
            return None
    
    def parse_zip_content(self, content: bytes) -> Dict[str, Any]:
        """Parse ZIP archive content"""
        import io
        
        result = {
            'is_directory': False,
            'hash_references': []
        }
        
        try:
            with zipfile.ZipFile(io.BytesIO(content), 'r') as zip_file:
                # Check if it contains multiple files (directory-like)
                files = zip_file.namelist()
                if len(files) > 1:
                    result['is_directory'] = True
                
                # Look for metadata files
                for filename in files:
                    if filename.endswith('.metadata'):
                        try:
                            metadata_content = zip_file.read(filename)
                            metadata = json.loads(metadata_content.decode('utf-8'))
                            result['metadata'] = metadata
                        except:
                            pass
                            
        except Exception as e:
            print(f"โš ๏ธ Error parsing ZIP: {e}")
        
        return result
    
    def parse_directory_listing(self, text_content: str) -> Dict[str, Any]:
        """Parse text content as directory listing with proper hierarchy handling"""
        result = {
            'is_directory': False,
            'hash_references': [],
            'child_objects': [],  # New objects to discover recursively
            'data_components': []  # Data components of current object
        }
        
        lines = text_content.split('\n')
        
        # Skip first line if it's just a number (count)
        if lines and lines[0].strip().isdigit():
            lines = lines[1:]
        
        import re
        # Pattern: hash:flags:uuid:type:size or hash:flags:uuid.component:type:size
        entry_pattern = r'^([a-f0-9]{64}):([0-9a-fA-F]+):([a-f0-9-]+(?:\.[^:]+)?):(\d+):(\d+)$'
        
        for line in lines:
            line = line.strip()
            if not line:
                continue
            
            match = re.match(entry_pattern, line, re.IGNORECASE)
            if match:
                hash_val, flags, uuid_component, type_val, size_val = match.groups()
                
                entry_info = {
                    'hash': hash_val,
                    'flags': flags,
                    'uuid_component': uuid_component,
                    'type': type_val,
                    'size': int(size_val),
                    'line': line
                }
                
                # Determine if this is a child object or data component
                if '.' in uuid_component:
                    # Data component (has extension like .content, .metadata, .rm, .pdf, .pagedata)
                    component_type = uuid_component.split('.')[-1]
                    entry_info['component_type'] = component_type
                    result['data_components'].append(entry_info)
                    
                    self.logger.debug(f"    ๐Ÿ“„ Data component: {component_type} ({size_val} bytes)")
                else:
                    # Child object (pure UUID, can be discovered recursively)
                    result['child_objects'].append(entry_info)
                    result['hash_references'].append({
                        'hash': hash_val,
                        'type': 'child_object',
                        'line': line,
                        'uuid': uuid_component,
                        'flags': flags,
                        'size': int(size_val)
                    })
                    
                    self.logger.debug(f"    ๐Ÿ”— Child object: {uuid_component} ({size_val} bytes)")
            else:
                self.logger.debug(f"    โš ๏ธ Unrecognized line format: {line}")
        
        if result['child_objects'] or result['data_components']:
            result['is_directory'] = True
            
        self.logger.debug(f"  Parsed directory: {len(result['child_objects'])} children, {len(result['data_components'])} components")
        
        return result
        
        return result
    
    def fetch_metadata_component(self, hash_ref: str) -> Optional[Dict[str, Any]]:
        """Fetch and parse a metadata component"""
        try:
            content_info = self.fetch_hash_content(hash_ref)
            if not content_info:
                return None
            
            content = content_info.get('content', b'')
            if isinstance(content, bytes):
                try:
                    text_content = content.decode('utf-8')
                    return json.loads(text_content)
                except (UnicodeDecodeError, json.JSONDecodeError) as e:
                    self.logger.debug(f"Failed to parse metadata from {hash_ref[:16]}...: {e}")
                    return None
            
            return None
        except Exception as e:
            self.logger.debug(f"Error fetching metadata component {hash_ref[:16]}...: {e}")
            return None
    
    def parse_metadata_to_node(self, hash_ref: str, content_info: Dict[str, Any]) -> Optional[RemarkableNode]:
        """Parse content info into a RemarkableNode with proper metadata handling"""
        
        # Initialize node with default values
        node_type = NodeType.DOCUMENT
        name = f"document_{hash_ref[:8]}"
        parent_uuid = None
        created_time = None
        last_modified = None
        visible_name = None
        
        # Check if this has data components (indicating it's an object with metadata)
        if content_info.get('data_components'):
            # Look for metadata component
            metadata_component = None
            for component in content_info.get('data_components', []):
                if component.get('component_type') == 'metadata':
                    self.logger.debug(f"  ๐Ÿ” Fetching metadata component: {component['hash'][:16]}...")
                    metadata_component = self.fetch_metadata_component(component['hash'])
                    break
            
            if metadata_component:
                visible_name = metadata_component.get('visibleName')
                parent_uuid = metadata_component.get('parent', '')
                created_time = metadata_component.get('createdTime')
                last_modified = metadata_component.get('lastModified')
                
                # Determine type from metadata
                if metadata_component.get('type') == 'CollectionType':
                    node_type = NodeType.FOLDER
                    name = visible_name or f"folder_{hash_ref[:8]}"
                else:
                    node_type = NodeType.DOCUMENT
                    name = visible_name or f"document_{hash_ref[:8]}"
                    
                self.logger.debug(f"  ๐Ÿ“‹ Parsed metadata: name='{name}', parent='{parent_uuid}', type='{metadata_component.get('type')}'")
            else:
                # No metadata found, use default naming
                self.logger.debug(f"  โš ๏ธ No metadata found for components in {hash_ref[:16]}...")
                if content_info.get('child_objects'):
                    node_type = NodeType.FOLDER
                    name = f"folder_{hash_ref[:8]}"
                
        elif content_info.get('is_directory') and content_info.get('child_objects'):
            # Directory with child objects but no data components
            node_type = NodeType.FOLDER
            name = f"folder_{hash_ref[:8]}"
        
        # Create node
        node = RemarkableNode(
            hash=hash_ref,
            name=name,
            parent=parent_uuid,  # Use UUID from metadata
            node_type=node_type,
            size=content_info.get('size', 0),
            created_time=created_time,
            last_modified=last_modified
        )
        
        # Store additional component information
        if content_info.get('data_components'):
            for component in content_info['data_components']:
                comp_type = component.get('component_type')
                if comp_type == 'content':
                    node.content_hash = component['hash']
                elif comp_type == 'metadata':
                    node.metadata_hash = component['hash']
                elif comp_type == 'pagedata':
                    node.pagedata_hash = component['hash']
                elif comp_type == 'pdf':
                    node.pdf_hash = component['hash']
        
        return node
    
    def discover_node_recursive(self, hash_ref: str, depth: int = 0, parent_path: str = "") -> Optional[RemarkableNode]:
        """Recursively discover a node and all its children"""
        
        # Check if already processed
        if hash_ref in self.nodes:
            return self.nodes[hash_ref]
        
        if hash_ref in self.failed_hashes:
            return None
        
        indent = "  " * depth
        print(f"{indent}๐Ÿ” Discovering node: {hash_ref[:16]}... (depth {depth})")
        self.logger.info(f"{indent}๐Ÿ” DISCOVERING NODE: {hash_ref} (depth {depth})")
        
        # Fetch content
        content_info = self.fetch_hash_content(hash_ref)
        if not content_info:
            self.logger.error(f"{indent}  Failed to fetch content for {hash_ref}")
            return None
        
        # Parse into node
        node = self.parse_metadata_to_node(hash_ref, content_info)
        if not node:
            print(f"{indent}  โš ๏ธ Could not parse into node")
            self.logger.warning(f"{indent}  Could not parse {hash_ref} into node")
            return None
        
        # Set additional properties
        node.depth = depth
        node.local_path = os.path.join(parent_path, node.name) if node.name != "<directory>" else parent_path
        
        # Log comprehensive node details
        self.log_node_details(node, content_info, depth)
        
        # Store node
        self.nodes[hash_ref] = node
        self.stats['total_nodes'] += 1
        
        if node.node_type == NodeType.FOLDER:
            self.stats['folders'] += 1
        else:
            self.stats['documents'] += 1
        
        print(f"{indent}  โœ… {node.node_type.value}: {node.name}")
        self.logger.info(f"{indent}  โœ… {node.node_type.value}: {node.name} | Size: {node.size} bytes | Parent: {node.parent or 'ROOT'}")
        
        # Process child objects only (not data components)
        if content_info.get('is_directory') and content_info.get('child_objects'):
            child_objects = content_info['child_objects']
            print(f"{indent}    ๐Ÿ“ Directory with {len(child_objects)} child objects")
            self.logger.info(f"{indent}    ๐Ÿ“ Directory with {len(child_objects)} child objects:")
            
            for i, child_info in enumerate(child_objects, 1):
                child_hash = child_info['hash']
                child_uuid = child_info['uuid_component']
                self.logger.debug(f"{indent}      Child {i}/{len(child_objects)}: {child_uuid} -> {child_hash}")
                try:
                    child_node = self.discover_node_recursive(
                        child_hash,
                        depth + 1,
                        node.local_path or ""
                    )
                    if child_node:
                        node.children.append(child_node)
                        self.logger.debug(f"{indent}      โœ… Child {i} processed successfully: {child_node.name}")
                    else:
                        self.logger.warning(f"{indent}      โš ๏ธ Child {i} returned None")
                except Exception as e:
                    error_msg = f"{indent}    โŒ Error processing child {child_hash[:16]}...: {e}"
                    print(error_msg)
                    self.logger.error(error_msg)
                    continue
        
        # Log data components for reference
        if content_info.get('data_components'):
            data_components = content_info['data_components']
            self.logger.info(f"{indent}    ๐Ÿ“„ Data components: {len(data_components)}")
            for component in data_components:
                comp_type = component.get('component_type', 'unknown')
                comp_size = component.get('size', 0)
                self.logger.debug(f"{indent}      ๐Ÿ’พ {comp_type}: {comp_size} bytes")
        
        # Save node data
        self.save_node_data(node, content_info)
        
        return node
    
    def save_node_data(self, node: RemarkableNode, content_info: Dict[str, Any]):
        """Save detailed node data to JSON file"""
        try:
            node_file = self.output_dir / f"node_{node.hash[:16]}.json"
            
            # Prepare JSON-safe data
            safe_content_info = dict(content_info)
            if 'content' in safe_content_info:
                if isinstance(safe_content_info['content'], bytes):
                    safe_content_info['content'] = safe_content_info['content'].hex()
                    safe_content_info['content_encoding'] = 'hex'
            
            node_data = {
                'hash': node.hash,
                'name': node.name,
                'type': node.node_type.value,
                'parent': node.parent,
                'depth': node.depth,
                'local_path': node.local_path,
                'size': node.size,
                'created_time': node.created_time,
                'last_modified': node.last_modified,
                'source': node.source,
                'children_count': len(node.children),
                'timestamp': datetime.now().isoformat(),
                'raw_content_info': safe_content_info
            }
            
            node_file.write_text(json.dumps(node_data, indent=2))
            
        except Exception as e:
            print(f"โš ๏ธ Error saving node data for {node.hash[:16]}...: {e}")
    
    def build_hierarchy_from_parents(self) -> Dict[str, List[RemarkableNode]]:
        """Build proper hierarchy using parent UUIDs from metadata"""
        hierarchy = {}
        
        self.logger.info("๐Ÿ—๏ธ Building hierarchy from parent UUIDs...")
        
        # Group nodes by parent UUID
        nodes_by_parent = {}
        root_nodes = []
        
        for node in self.nodes.values():
            parent_uuid = node.parent
            if not parent_uuid or parent_uuid == "":
                root_nodes.append(node)
                self.logger.debug(f"  ๐Ÿ“ Root node: {node.name}")
            else:
                if parent_uuid not in nodes_by_parent:
                    nodes_by_parent[parent_uuid] = []
                nodes_by_parent[parent_uuid].append(node)
                self.logger.debug(f"  ๐Ÿ“„ Child of {parent_uuid}: {node.name}")
        
        # Build hierarchy
        hierarchy['root'] = root_nodes
        hierarchy['children'] = nodes_by_parent
        
        self.logger.info(f"  ๐Ÿ“Š Hierarchy built: {len(root_nodes)} root nodes, {len(nodes_by_parent)} parent groups")
        
        return hierarchy
    
    def discover_all(self) -> bool:
        """Complete discovery process from root"""
        print("๐Ÿš€ Starting complete reMarkable cloud discovery...")
        self.logger.info("๐Ÿš€ STARTING COMPLETE REMARKABLE CLOUD DISCOVERY")
        self.logger.info(f"๐Ÿ“ Output directory: {self.output_dir}")
        self.logger.info(f"๐Ÿ“ Log file: {self.log_file}")
        
        # Get root hash
        root_hash = self.get_root_hash()
        if not root_hash:
            print("โŒ Failed to get root hash")
            self.logger.error("โŒ Failed to get root hash")
            return False
        
        self.logger.info(f"๐Ÿ” Starting discovery from root hash: {root_hash}")
        
        # Start recursive discovery
        try:
            root_node = self.discover_node_recursive(root_hash, depth=0, parent_path="")
            
            if root_node:
                self.root_nodes.append(root_node)
                
                # Build proper hierarchy using parent UUIDs
                hierarchy = self.build_hierarchy_from_parents()
                
                print(f"\nโœ… DISCOVERY COMPLETE!")
                print(f"๐Ÿ“Š Statistics:")
                print(f"  โ€ข Total nodes: {self.stats['total_nodes']}")
                print(f"  โ€ข Folders: {self.stats['folders']}")
                print(f"  โ€ข Documents: {self.stats['documents']}")
                print(f"  โ€ข Successful downloads: {self.stats['successful_downloads']}")
                print(f"  โ€ข Failed downloads: {self.stats['failed_downloads']}")
                print(f"  โ€ข Total bytes: {self.stats['bytes_downloaded']:,}")
                
                # Show proper hierarchy
                print(f"\n๐Ÿ—๏ธ PROPER HIERARCHY:")
                self.print_proper_hierarchy(hierarchy)
                
                # Log final statistics
                self.logger.info("๐ŸŽ‰ DISCOVERY COMPLETED SUCCESSFULLY!")
                self.logger.info(f"๐Ÿ“Š FINAL STATISTICS:")
                self.logger.info(f"  โ€ข Total nodes discovered: {self.stats['total_nodes']}")
                self.logger.info(f"  โ€ข Folder nodes: {self.stats['folders']}")
                self.logger.info(f"  โ€ข Document nodes: {self.stats['documents']}")
                self.logger.info(f"  โ€ข Successful downloads: {self.stats['successful_downloads']}")
                self.logger.info(f"  โ€ข Failed downloads: {self.stats['failed_downloads']}")
                self.logger.info(f"  โ€ข Total bytes downloaded: {self.stats['bytes_downloaded']:,}")
                
                # Show tree structure (old flat version)
                print(f"\n๐ŸŒณ ORIGINAL DISCOVERY TREE:")
                self.print_tree()
                
                # Save discovery summary
                self.save_discovery_summary()
                
                self.logger.info(f"๐Ÿ’พ Discovery data saved to: {self.output_dir}")
                self.logger.info(f"๐Ÿ“ Detailed log saved to: {self.log_file}")
                
                return True
            else:
                print("โŒ Failed to discover from root")
                self.logger.error("โŒ Failed to discover from root")
                return False
                
        except Exception as e:
            print(f"โŒ Discovery error: {e}")
            return False
    
    def print_proper_hierarchy(self, hierarchy: Dict[str, Any]):
        """Print the proper hierarchy built from parent UUIDs"""
        
        def print_nodes(nodes: List[RemarkableNode], prefix: str = "", parent_name: str = "ROOT"):
            for i, node in enumerate(nodes):
                is_last = i == len(nodes) - 1
                type_icon = "๐Ÿ“" if node.node_type == NodeType.FOLDER else "๐Ÿ“„"
                size_info = f" ({node.size:,} bytes)"
                
                print(f"{prefix}{'โ””โ”€โ”€ ' if is_last else 'โ”œโ”€โ”€ '}{type_icon} {node.name}{size_info}")
                
                # Find children of this node using its hash as parent
                node_children = hierarchy['children'].get(node.hash, [])
                if node_children:
                    child_prefix = prefix + ("    " if is_last else "โ”‚   ")
                    print_nodes(node_children, child_prefix, node.name)
        
        # Print root nodes
        root_nodes = hierarchy.get('root', [])
        print(f"๐Ÿ“ Root Level ({len(root_nodes)} items)")
        print_nodes(root_nodes)
        
        # Show parent groups summary
        parent_groups = hierarchy.get('children', {})
        if parent_groups:
            print(f"\n๐Ÿ“Š Parent Groups:")
            for parent_uuid, children in parent_groups.items():
                print(f"  ๐Ÿ‘จโ€๐Ÿ‘ฉโ€๐Ÿ‘งโ€๐Ÿ‘ฆ {parent_uuid}: {len(children)} children")

    def print_tree(self):
        """Print the discovered tree structure"""
        print(f"\n๐ŸŒณ DISCOVERED TREE STRUCTURE:")
        
        def show_tree(node: RemarkableNode, prefix: str = ""):
            type_icon = "๐Ÿ“" if node.node_type == NodeType.FOLDER else "๐Ÿ“„"
            size_info = f" ({node.size:,} bytes)" if node.size > 0 else ""
            print(f"{prefix}{type_icon} {node.name}{size_info}")
            
            for i, child in enumerate(node.children):
                is_last = i == len(node.children) - 1
                child_prefix = prefix + ("โ””โ”€โ”€ " if is_last else "โ”œโ”€โ”€ ")
                show_tree(child, child_prefix)
        
        for root in self.root_nodes:
            show_tree(root)
    
    def save_discovery_summary(self):
        """Save complete discovery summary"""
        summary = {
            'timestamp': datetime.now().isoformat(),
            'stats': self.stats,
            'root_nodes': len(self.root_nodes),
            'total_nodes': len(self.nodes),
            'failed_hashes': list(self.failed_hashes),
            'output_directory': str(self.output_dir)
        }
        
        summary_file = self.output_dir / "discovery_summary.json"
        summary_file.write_text(json.dumps(summary, indent=2))
        
        print(f"\n๐Ÿ’พ Discovery summary saved to: {summary_file}")

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, session, output_dir)

Purpose: Internal method: init

Parameters:

  • session: Type: requests.Session
  • output_dir: Type: str

Returns: None

setup_logging(self)

Purpose: Setup detailed logging to file and console

Returns: None

log_node_details(self, node, content_info, depth)

Purpose: Log comprehensive details about a discovered node

Parameters:

  • node: Type: RemarkableNode
  • content_info: Type: Dict[str, Any]
  • depth: Type: int

Returns: None

log_raw_content(self, hash_ref, raw_content, content_type)

Purpose: Log raw content before processing

Parameters:

  • hash_ref: Type: str
  • raw_content: Type: Any
  • content_type: Type: str

Returns: None

get_root_hash(self) -> Optional[str]

Purpose: Get the current root hash from sync/v4/root endpoint

Returns: Returns Optional[str]

fetch_hash_content(self, hash_ref) -> Optional[Dict[str, Any]]

Purpose: Fetch and analyze content for a given hash

Parameters:

  • hash_ref: Type: str

Returns: Returns Optional[Dict[str, Any]]

parse_zip_content(self, content) -> Dict[str, Any]

Purpose: Parse ZIP archive content

Parameters:

  • content: Type: bytes

Returns: Returns Dict[str, Any]

parse_directory_listing(self, text_content) -> Dict[str, Any]

Purpose: Parse text content as directory listing with proper hierarchy handling

Parameters:

  • text_content: Type: str

Returns: Returns Dict[str, Any]

fetch_metadata_component(self, hash_ref) -> Optional[Dict[str, Any]]

Purpose: Fetch and parse a metadata component

Parameters:

  • hash_ref: Type: str

Returns: Returns Optional[Dict[str, Any]]

parse_metadata_to_node(self, hash_ref, content_info) -> Optional[RemarkableNode]

Purpose: Parse content info into a RemarkableNode with proper metadata handling

Parameters:

  • hash_ref: Type: str
  • content_info: Type: Dict[str, Any]

Returns: Returns Optional[RemarkableNode]

discover_node_recursive(self, hash_ref, depth, parent_path) -> Optional[RemarkableNode]

Purpose: Recursively discover a node and all its children

Parameters:

  • hash_ref: Type: str
  • depth: Type: int
  • parent_path: Type: str

Returns: Returns Optional[RemarkableNode]

save_node_data(self, node, content_info)

Purpose: Save detailed node data to JSON file

Parameters:

  • node: Type: RemarkableNode
  • content_info: Type: Dict[str, Any]

Returns: None

build_hierarchy_from_parents(self) -> Dict[str, List[RemarkableNode]]

Purpose: Build proper hierarchy using parent UUIDs from metadata

Returns: Returns Dict[str, List[RemarkableNode]]

discover_all(self) -> bool

Purpose: Complete discovery process from root

Returns: Returns bool

print_proper_hierarchy(self, hierarchy)

Purpose: Print the proper hierarchy built from parent UUIDs

Parameters:

  • hierarchy: Type: Dict[str, Any]

Returns: None

print_tree(self)

Purpose: Print the discovered tree structure

Returns: None

save_discovery_summary(self)

Purpose: Save complete discovery summary

Returns: None

Required Imports

import os
import json
import zipfile
import requests
import logging

Usage Example

# Example usage:
# result = RemarkableDiscovery(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • function test_discovery 69.9% similar

    Tests the hierarchical discovery functionality of a RemarkableDiscovery instance by discovering and cataloging all nodes (folders and documents) from a reMarkable device session.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/test_suite.py
  • class RemarkableNode 66.6% similar

    A dataclass representing a node (file or folder) in the reMarkable cloud storage system, containing metadata, hierarchy information, and component hashes for documents.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/discovery.py
  • class RemarkableCloudManager 64.8% similar

    Unified manager for reMarkable Cloud operations that uses REST API as primary method with rmcl library as fallback, handling authentication, file operations, and folder management.

    From: /tf/active/vicechatdev/e-ink-llm/remarkable_cloud.py
  • class RemarkableReplicaSync 64.6% similar

    A class that synchronizes reMarkable cloud documents to a local replica directory, downloading and organizing folders and documents in a hierarchical structure.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/sync_replica_new.py
  • class RemarkableNode_v1 63.2% similar

    A dataclass representing a node (folder or document) in the reMarkable cloud storage system, storing metadata, hashes, and local file paths.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/local_replica_v2.py
โ† Back to Browse