๐Ÿ” Code Extractor

class RemarkableUploadTests

Maturity: 25

Test suite for reMarkable upload functionality

File:
/tf/active/vicechatdev/e-ink-llm/cloudtest/test_uploads.py
Lines:
107 - 1272
Complexity:
moderate

Purpose

Test suite for reMarkable upload functionality

Source Code

class RemarkableUploadTests:
    """Test suite for reMarkable upload functionality"""
    
    def __init__(self, enable_raw_logging: bool = True):
        self.base_dir = Path(__file__).parent
        self.test_results = []
        self.raw_logging = enable_raw_logging
        
        if self.raw_logging:
            print("๐Ÿ” Raw HTTP request logging ENABLED")
        else:
            print("๐Ÿ” Raw HTTP request logging DISABLED")
        
        # Load auth session
        from auth import RemarkableAuth
        auth = RemarkableAuth()
        self.session = auth.get_authenticated_session()
        
        if not self.session:
            raise RuntimeError("Failed to authenticate with reMarkable")
        
        # Load upload manager
        from upload_manager import RemarkableUploadManager
        database_path = self.base_dir / "remarkable_replica_v2" / "replica_database.json"
        self.uploader = RemarkableUploadManager(self.session, database_path)
        
        print("๐Ÿงช reMarkable Upload Test Suite Initialized")
    
    def save_raw_logs(self) -> Path:
        """Save captured raw HTTP logs to file for comparison with real app logs"""
        global raw_logs
        
        if not raw_logs:
            print("๐Ÿ“ No raw logs to save")
            return None
        
        logs_dir = self.base_dir / "test_results" / "raw_logs"
        logs_dir.mkdir(parents=True, exist_ok=True)
        
        timestamp = int(time.time())
        log_file = logs_dir / f"raw_requests_{timestamp}.json"
        
        # Save detailed logs
        with open(log_file, 'w') as f:
            json.dump(raw_logs, f, indent=2, default=str)
        
        # Also create a simplified log similar to real app logs
        simple_log_file = logs_dir / f"simple_requests_{timestamp}.txt"
        with open(simple_log_file, 'w') as f:
            f.write("=== reMarkable Upload Test - Raw Request Log ===\n\n")
            for i, log in enumerate(raw_logs, 1):
                f.write(f"Request #{i}: {log['method']} {log['url']}\n")
                f.write(f"Status: {log['response_status']}\n")
                
                # Key headers
                if 'rm-filename' in log['headers']:
                    f.write(f"rm-filename: {log['headers']['rm-filename']}\n")
                if 'x-goog-hash' in log['headers']:
                    f.write(f"x-goog-hash: {log['headers']['x-goog-hash']}\n")
                if 'Content-Type' in log['headers']:
                    f.write(f"Content-Type: {log['headers']['Content-Type']}\n")
                
                # Body information
                f.write(f"Body size: {log['body_size']} bytes\n")
                
                # Include full text body if available
                if log.get('body_type') == 'text' and log.get('body_text'):
                    f.write(f"Body content:\n{log['body_text']}\n")
                elif log.get('body_type') == 'binary':
                    f.write(f"Body: {log['body_text']}\n")
                elif log.get('json_body'):
                    f.write(f"JSON Body: {json.dumps(log['json_body'], indent=2)}\n")
                
                # Response information
                if log.get('response_text'):
                    f.write(f"Response: {log['response_text']}\n")
                
                f.write("-" * 50 + "\n\n")
        
        print(f"๐Ÿ“ Raw logs saved:")
        print(f"   Detailed: {log_file}")
        print(f"   Simple: {simple_log_file}")
        print(f"   Total requests: {len(raw_logs)}")
        
        return log_file

    def log_test(self, test_name: str, success: bool, details: str = ""):
        """Log test result"""
        status = "โœ… PASS" if success else "โŒ FAIL"
        print(f"{status} {test_name}")
        if details:
            print(f"   {details}")
        
        self.test_results.append({
            'test': test_name,
            'success': success,
            'details': details,
            'timestamp': time.time()
        })
    
    def test_edit_document_name(self) -> bool:
        """Test 1: Edit existing document name"""
        try:
            print("\n๐Ÿ”ง Test 1: Edit Document Name")
            
            # Find a document to edit
            database = self.uploader.database
            document_uuid = None
            original_name = None
            
            for uuid, node in database['nodes'].items():
                if node['node_type'] == 'document' and 'pdf' not in node['name'].lower():
                    document_uuid = uuid
                    original_name = node['name']
                    break
            
            if not document_uuid:
                self.log_test("Edit Document Name", False, "No suitable document found")
                return False
            
            # Generate new name
            new_name = f"TEST_RENAMED_{int(time.time())}"
            print(f"Renaming '{original_name}' to '{new_name}'")
            
            # Perform edit
            success = self.uploader.edit_document_metadata(document_uuid, new_name=new_name)
            
            if success:
                # Verify in database
                updated_node = self.uploader.database['nodes'][document_uuid]
                if updated_node['metadata']['visibleName'] == new_name:
                    self.log_test("Edit Document Name", True, f"Renamed to: {new_name}")
                    
                    # Rename back to original
                    self.uploader.edit_document_metadata(document_uuid, new_name=original_name)
                    return True
                else:
                    self.log_test("Edit Document Name", False, "Database not updated")
                    return False
            else:
                self.log_test("Edit Document Name", False, "Upload failed")
                return False
                
        except Exception as e:
            self.log_test("Edit Document Name", False, f"Exception: {e}")
            return False
    
    def test_create_test_pdf(self) -> Path:
        """Create a test PDF for upload testing"""
        try:
            from reportlab.pdfgen import canvas
            from reportlab.lib.pagesizes import letter
            
            test_pdf_path = self.base_dir / "test_uploads" / "test_document.pdf"
            test_pdf_path.parent.mkdir(exist_ok=True)
            
            # Create simple PDF
            c = canvas.Canvas(str(test_pdf_path), pagesize=letter)
            c.drawString(100, 750, f"reMarkable Upload Test Document")
            c.drawString(100, 720, f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
            c.drawString(100, 690, f"Test UUID: {uuid.uuid4()}")
            c.drawString(100, 660, "This is a test document for upload functionality.")
            c.showPage()
            c.save()
            
            return test_pdf_path
            
        except ImportError:
            # Create simple text-based PDF content
            test_pdf_path = self.base_dir / "test_uploads" / "test_document.txt"
            test_pdf_path.parent.mkdir(exist_ok=True)
            
            with open(test_pdf_path, 'w') as f:
                f.write(f"reMarkable Upload Test Document\n")
                f.write(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
                f.write(f"Test UUID: {uuid.uuid4()}\n")
                f.write("This is a test document for upload functionality.\n")
            
            print("โš ๏ธ  Using text file instead of PDF (reportlab not available)")
            return test_pdf_path
    
    def test_upload_new_pdf(self, parent_uuid: str = "") -> bool:
        """Test: Upload new PDF document to specified folder (or root if no parent_uuid)"""
        try:
            target_location = "specified folder" if parent_uuid else "root folder"
            folder_info = f" (UUID: {parent_uuid})" if parent_uuid else ""
            print(f"\n๐Ÿ“„ Test: Upload New PDF Document to {target_location}{folder_info}")
            
            # Create test PDF
            test_pdf = self.test_create_test_pdf()
            test_name = f"UploadTest_{int(time.time())}"
            
            print(f"Uploading: {test_pdf} as '{test_name}' to {target_location}")
            
            # Get initial document count from current database (no pre-sync needed)
            initial_count = len([n for n in self.uploader.database['nodes'].values() 
                               if n['node_type'] == 'document'])
            print(f"๐Ÿ“Š Initial document count: {initial_count}")
            
            # Upload document with parent parameter - this should follow the complete sequence:
            # 1-5: Document components + docSchema
            # 6: root.docSchema update  
            # 7: roothash update
            # 8: Replica sync to verify
            success = self.uploader.upload_pdf_document(str(test_pdf), test_name, parent_uuid)
            
            if success:
                print(f"โœ… Upload completed - document should be visible in {target_location}")
                
                # The upload_manager already runs replica sync, so no need to duplicate it
                # Just verify the result
                new_count = len([n for n in self.uploader.database['nodes'].values() 
                               if n['node_type'] == 'document'])
                print(f"๐Ÿ“Š New document count: {new_count}")
                
                # Find the new document
                new_doc = None
                for node in self.uploader.database['nodes'].values():
                    if node.get('name') == test_name:
                        new_doc = node
                        break
                
                if new_doc:
                    # Verify parent location if specified
                    actual_parent = new_doc.get('metadata', {}).get('parent', '')
                    expected_parent = parent_uuid if parent_uuid else ''
                    
                    if actual_parent == expected_parent:
                        location_status = f"โœ… in correct location ({target_location})"
                    else:
                        location_status = f"โŒ wrong location (expected: {expected_parent}, actual: {actual_parent})"
                    
                    self.log_test("Upload New PDF", True, 
                                f"Document created: {test_name} (UUID: {new_doc['uuid'][:8]}...) {location_status}")
                    return True
                elif new_count > initial_count:
                    self.log_test("Upload New PDF", True, 
                                f"Document uploaded successfully (count increased by {new_count - initial_count})")
                    return True
                else:
                    self.log_test("Upload New PDF", False, "Document not found and count unchanged")
                    return False
            else:
                self.log_test("Upload New PDF", False, "Upload failed")
                return False
                
        except Exception as e:
            self.log_test("Upload New PDF", False, f"Exception: {e}")
            return False
    
    def test_create_new_notebook(self) -> bool:
        """Test 3: Create new notebook"""
        try:
            print("\n๐Ÿ““ Test 3: Create New Notebook")
            
            notebook_name = f"TestNotebook_{int(time.time())}"
            print(f"Creating notebook: '{notebook_name}'")
            
            # Get initial document count
            initial_count = len([n for n in self.uploader.database['nodes'].values() 
                               if n['node_type'] == 'document'])
            
            # Create notebook
            success = self.uploader.create_notebook(notebook_name)
            
            if success:
                # Verify in database
                new_count = len([n for n in self.uploader.database['nodes'].values() 
                               if n['node_type'] == 'document'])
                
                if new_count > initial_count:
                    # Find the new notebook
                    new_notebook = None
                    for node in self.uploader.database['nodes'].values():
                        if node['name'] == notebook_name:
                            new_notebook = node
                            break
                    
                    if new_notebook:
                        # Verify it has content structure
                        has_content = new_notebook['component_hashes'].get('content') is not None
                        has_rm_files = len(new_notebook['component_hashes'].get('rm_files', [])) > 0
                        
                        if has_content and has_rm_files:
                            self.log_test("Create New Notebook", True, 
                                        f"Notebook created: {notebook_name} with content + RM files")
                            return True
                        else:
                            self.log_test("Create New Notebook", False, 
                                        "Notebook missing content structure")
                            return False
                    else:
                        self.log_test("Create New Notebook", False, "Notebook not found in database")
                        return False
                else:
                    self.log_test("Create New Notebook", False, "Document count unchanged")
                    return False
            else:
                self.log_test("Create New Notebook", False, "Creation failed")
                return False
                
        except Exception as e:
            self.log_test("Create New Notebook", False, f"Exception: {e}")
            return False
    
    def test_move_document(self) -> bool:
        """Test 4: Move document to different folder"""
        try:
            print("\n๐Ÿ“ Test 4: Move Document")
            
            database = self.uploader.database
            
            # Find a document and a folder
            document_uuid = None
            folder_uuid = None
            original_parent = None
            
            for uuid, node in database['nodes'].items():
                if node['node_type'] == 'document' and document_uuid is None:
                    document_uuid = uuid
                    original_parent = node.get('parent_uuid', '')
                elif node['node_type'] == 'folder' and folder_uuid is None:
                    folder_uuid = uuid
                
                if document_uuid and folder_uuid:
                    break
            
            if not document_uuid:
                self.log_test("Move Document", False, "No document found")
                return False
            
            if not folder_uuid:
                self.log_test("Move Document", False, "No folder found")
                return False
            
            document_name = database['nodes'][document_uuid]['name']
            folder_name = database['nodes'][folder_uuid]['name']
            
            print(f"Moving '{document_name}' to folder '{folder_name}'")
            
            # Move document
            success = self.uploader.edit_document_metadata(document_uuid, new_parent=folder_uuid)
            
            if success:
                # Verify move
                updated_node = self.uploader.database['nodes'][document_uuid]
                if updated_node['metadata']['parent'] == folder_uuid:
                    self.log_test("Move Document", True, 
                                f"Moved '{document_name}' to '{folder_name}'")
                    
                    # Move back to original location
                    self.uploader.edit_document_metadata(document_uuid, new_parent=original_parent)
                    return True
                else:
                    self.log_test("Move Document", False, "Parent not updated in database")
                    return False
            else:
                self.log_test("Move Document", False, "Move operation failed")
                return False
                
        except Exception as e:
            self.log_test("Move Document", False, f"Exception: {e}")
            return False
    
    def test_hash_consistency(self) -> bool:
        """Test 5: Verify hash consistency after uploads"""
        try:
            print("\n๐Ÿ” Test 5: Hash Consistency Check")
            
            database = self.uploader.database
            hash_registry = database.get('hash_registry', {})
            
            # Check for hash collisions
            hash_types = {}
            for hash_val, info in hash_registry.items():
                hash_type = info.get('type', 'unknown')
                if hash_type not in hash_types:
                    hash_types[hash_type] = []
                hash_types[hash_type].append(hash_val)
            
            # Verify recent uploads have proper hash entries
            recent_uploads = [result for result in self.test_results if result['success']]
            
            consistency_issues = 0
            
            # Check node consistency
            for uuid, node in database['nodes'].items():
                node_hash = node.get('hash')
                if node_hash and node_hash not in hash_registry:
                    consistency_issues += 1
                    print(f"โš ๏ธ  Node {uuid[:8]}... has unregistered hash {node_hash[:16]}...")
                
                # Check component hashes
                comp_hashes = node.get('component_hashes', {})
                for comp_type, comp_hash in comp_hashes.items():
                    if comp_hash and isinstance(comp_hash, str) and comp_hash not in hash_registry:
                        consistency_issues += 1
                        print(f"โš ๏ธ  Component {comp_type} has unregistered hash {comp_hash[:16]}...")
            
            if consistency_issues == 0:
                self.log_test("Hash Consistency", True, 
                            f"All hashes properly registered ({len(hash_registry)} total)")
                return True
            else:
                self.log_test("Hash Consistency", False, 
                            f"{consistency_issues} hash consistency issues found")
                return False
                
        except Exception as e:
            self.log_test("Hash Consistency", False, f"Exception: {e}")
            return False
    
    def validate_uploaded_document(self, document_uuid: str) -> bool:
        """Validate that an uploaded document has all required components accessible"""
        try:
            print(f"\n๐Ÿ” Validating uploaded document: {document_uuid[:8]}...")
            
            # Find the document in our database
            doc_node = None
            for uuid, node in self.uploader.database['nodes'].items():
                if uuid == document_uuid:
                    doc_node = node
                    break
            
            if not doc_node:
                print(f"โŒ Document {document_uuid[:8]}... not found in local database")
                return False
            
            print(f"๐Ÿ“„ Document found: {doc_node['name']}")
            print(f"๐Ÿ”— Document hash: {doc_node['hash']}")
            
            # Try to fetch the document's docSchema from server
            doc_hash = doc_node['hash']
            try:
                doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
                doc_response.raise_for_status()
                print(f"โœ… Document docSchema accessible from server ({len(doc_response.text)} bytes)")
                print(f"๐Ÿ“‹ DocSchema content preview: {doc_response.text[:200]}...")
                
                # Parse the docSchema and validate each component
                lines = doc_response.text.strip().split('\n')
                if len(lines) < 2:
                    print(f"โŒ Invalid docSchema format: too few lines ({len(lines)})")
                    return False
                
                version = lines[0]
                print(f"๐Ÿ“Š DocSchema version: {version}")
                
                component_count = 0
                missing_components = []
                
                for line in lines[1:]:
                    if ':' in line:
                        parts = line.split(':')
                        if len(parts) >= 3:
                            comp_hash = parts[0]
                            comp_name = parts[2]
                            component_count += 1
                            
                            # Try to fetch this component
                            try:
                                comp_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{comp_hash}")
                                comp_response.raise_for_status()
                                print(f"โœ… Component {comp_name}: accessible ({len(comp_response.content)} bytes)")
                            except Exception as e:
                                print(f"โŒ Component {comp_name}: NOT accessible - {e}")
                                missing_components.append(comp_name)
                
                print(f"๐Ÿ“Š Total components: {component_count}")
                print(f"โŒ Missing components: {len(missing_components)}")
                
                if missing_components:
                    print(f"โš ๏ธ Missing: {missing_components}")
                    return False
                else:
                    print(f"โœ… All document components are accessible")
                    return True
                
            except Exception as e:
                print(f"โŒ Cannot fetch document docSchema: {e}")
                return False
                
        except Exception as e:
            print(f"โŒ Document validation failed: {e}")
            return False

    def analyze_root_docschema_sizes(self) -> bool:
        """Analyze the sizes in root.docSchema to identify patterns and issues"""
        try:
            print(f"\n๐Ÿ“Š Analyzing root.docSchema sizes...")
            
            # Get current root.docSchema
            root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
            root_response.raise_for_status()
            root_data = root_response.json()
            current_root_hash = root_data['hash']
            
            root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{current_root_hash}")
            root_content_response.raise_for_status()
            root_content = root_content_response.text
            
            # Parse entries and analyze sizes
            lines = root_content.strip().split('\n')[1:]  # Skip version header
            size_analysis = {}
            entries_by_size = {}
            
            print(f"๐Ÿ“‹ Analyzing {len(lines)} entries...")
            
            for line in lines:
                if ':' in line:
                    parts = line.split(':')
                    if len(parts) >= 5:
                        doc_hash = parts[0]
                        doc_uuid = parts[2]
                        node_type = parts[3]
                        size = parts[4]
                        
                        # Group by size
                        if size not in entries_by_size:
                            entries_by_size[size] = []
                        entries_by_size[size].append({
                            'uuid': doc_uuid,
                            'hash': doc_hash,
                            'type': node_type,
                            'line': line
                        })
            
            # Report size distribution
            print(f"\n๐Ÿ“Š Size Distribution Analysis:")
            for size, entries in sorted(entries_by_size.items(), key=lambda x: len(x[1]), reverse=True):
                count = len(entries)
                print(f"   Size {size}: {count} documents")
                
                if count > 1:
                    print(f"      โš ๏ธ Multiple documents with identical size:")
                    for entry in entries[:5]:  # Show first 5
                        print(f"         - {entry['uuid'][:8]}... (type {entry['type']})")
                    if len(entries) > 5:
                        print(f"         ... and {len(entries) - 5} more")
            
            # Check if our uploaded documents have suspicious sizes
            suspicious_sizes = ['2247', '2246']  # The sizes we noticed
            print(f"\n๐Ÿ” Checking for suspicious identical sizes...")
            
            for size in suspicious_sizes:
                if size in entries_by_size and len(entries_by_size[size]) > 1:
                    print(f"โš ๏ธ Found {len(entries_by_size[size])} documents with size {size}:")
                    
                    # Test each document with this size
                    for entry in entries_by_size[size]:
                        print(f"\n   Testing document {entry['uuid'][:8]}...")
                        try:
                            # Fetch the document's docSchema
                            doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{entry['hash']}")
                            doc_response.raise_for_status()
                            actual_docschema_size = len(doc_response.text)
                            
                            print(f"      DocSchema actual size: {actual_docschema_size} bytes")
                            print(f"      Root.docSchema claims: {size} bytes")
                            print(f"      Match: {'โœ…' if str(actual_docschema_size) == size else 'โŒ'}")
                            
                            if str(actual_docschema_size) != size:
                                print(f"      โš ๏ธ SIZE MISMATCH DETECTED!")
                                print(f"      ๐Ÿ“„ DocSchema content preview: {doc_response.text[:100]}...")
                        
                        except Exception as e:
                            print(f"      โŒ Cannot fetch docSchema: {e}")
            
            return True
            
        except Exception as e:
            print(f"โŒ Root.docSchema size analysis failed: {e}")
            return False

    def test_document_download_chain(self, document_uuid: str) -> bool:
        """Follow the complete download chain for a document to identify where it fails"""
        try:
            print(f"\n๐Ÿ”— Testing Document Download Chain: {document_uuid[:8]}...")
            
            # Step 1: Get current root.docSchema and find our document
            print("๐Ÿ“‹ Step 1: Getting root.docSchema from server...")
            root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
            root_response.raise_for_status()
            root_data = root_response.json()
            current_root_hash = root_data['hash']
            
            print(f"โœ… Root hash: {current_root_hash}")
            print(f"โœ… Generation: {root_data.get('generation')}")
            
            # Step 2: Fetch root.docSchema content
            print("๐Ÿ“‹ Step 2: Fetching root.docSchema content...")
            root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{current_root_hash}")
            root_content_response.raise_for_status()
            root_content = root_content_response.text
            
            print(f"โœ… Root.docSchema size: {len(root_content)} bytes")
            print(f"๐Ÿ“„ Root.docSchema content preview:")
            for i, line in enumerate(root_content.strip().split('\n')[:10]):
                print(f"   {i}: {line}")
            
            # Step 3: Find our document in root.docSchema
            print(f"๐Ÿ“‹ Step 3: Looking for document {document_uuid} in root.docSchema...")
            doc_entry = None
            doc_hash = None
            doc_size = None
            
            for line in root_content.strip().split('\n')[1:]:  # Skip version header
                if document_uuid in line:
                    doc_entry = line
                    parts = line.split(':')
                    if len(parts) >= 5:
                        doc_hash = parts[0]
                        doc_size = parts[4]
                    break
            
            if not doc_entry:
                print(f"โŒ Document {document_uuid} NOT found in root.docSchema")
                return False
            
            print(f"โœ… Document found in root.docSchema:")
            print(f"   Entry: {doc_entry}")
            print(f"   Hash: {doc_hash}")
            print(f"   Size: {doc_size}")
            
            # Step 4: Fetch document's docSchema
            print(f"๐Ÿ“‹ Step 4: Fetching document's docSchema...")
            doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
            doc_response.raise_for_status()
            doc_schema_content = doc_response.text
            
            print(f"โœ… Document docSchema size: {len(doc_schema_content)} bytes")
            print(f"๐Ÿ“„ Document docSchema content:")
            for i, line in enumerate(doc_schema_content.strip().split('\n')):
                print(f"   {i}: {line}")
            
            # Step 5: Parse docSchema and test each component
            print(f"๐Ÿ“‹ Step 5: Testing each document component...")
            lines = doc_schema_content.strip().split('\n')
            if len(lines) < 2:
                print(f"โŒ Invalid docSchema format: only {len(lines)} lines")
                return False
            
            version = lines[0]
            print(f"๐Ÿ“Š DocSchema version: {version}")
            
            all_components_valid = True
            component_details = {}
            
            for i, line in enumerate(lines[1:], 1):
                if ':' in line:
                    parts = line.split(':')
                    if len(parts) >= 5:
                        comp_hash = parts[0]
                        comp_name = parts[2]
                        comp_size = parts[4]
                        
                        print(f"\n   ๐Ÿ” Testing component {i}: {comp_name}")
                        print(f"       Hash: {comp_hash}")
                        print(f"       Expected size: {comp_size}")
                        
                        try:
                            comp_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{comp_hash}")
                            comp_response.raise_for_status()
                            actual_size = len(comp_response.content)
                            
                            print(f"       โœ… Component accessible")
                            print(f"       ๐Ÿ“ Actual size: {actual_size} bytes")
                            print(f"       ๐Ÿ“Š Size match: {'โœ…' if str(actual_size) == comp_size else 'โŒ'}")
                            
                            # Store component details
                            component_details[comp_name] = {
                                'hash': comp_hash,
                                'expected_size': comp_size,
                                'actual_size': actual_size,
                                'accessible': True,
                                'content_preview': comp_response.content[:100] if comp_response.content else b''
                            }
                            
                            # Special handling for PDF content
                            if comp_name.endswith('.pdf'):
                                print(f"       ๐Ÿ“„ PDF content preview: {comp_response.content[:50]}...")
                                if comp_response.content.startswith(b'%PDF'):
                                    print(f"       โœ… Valid PDF header detected")
                                else:
                                    print(f"       โŒ Invalid PDF header - content: {comp_response.content[:20]}")
                                    all_components_valid = False
                            
                            # Special handling for metadata
                            elif comp_name.endswith('.metadata'):
                                try:
                                    metadata_json = json.loads(comp_response.text)
                                    print(f"       โœ… Valid JSON metadata")
                                    print(f"       ๐Ÿ“ Name: {metadata_json.get('visibleName', 'N/A')}")
                                    print(f"       ๐Ÿ“ Parent: {metadata_json.get('parent', 'root')}")
                                except:
                                    print(f"       โŒ Invalid JSON metadata")
                                    all_components_valid = False
                            
                        except Exception as e:
                            print(f"       โŒ Component NOT accessible: {e}")
                            component_details[comp_name] = {
                                'hash': comp_hash,
                                'expected_size': comp_size,
                                'accessible': False,
                                'error': str(e)
                            }
                            all_components_valid = False
            
            # Step 6: Summary
            print(f"\n๐Ÿ“‹ Step 6: Download Chain Summary")
            print(f"   Root.docSchema: โœ… Accessible")
            print(f"   Document entry: โœ… Found")
            print(f"   Document docSchema: โœ… Accessible")
            print(f"   All components: {'โœ… Valid' if all_components_valid else 'โŒ Issues found'}")
            
            if not all_components_valid:
                print(f"\nโš ๏ธ Component Issues Detected:")
                for name, details in component_details.items():
                    if not details.get('accessible', False):
                        print(f"   โŒ {name}: {details.get('error', 'Not accessible')}")
                    elif details.get('expected_size') != str(details.get('actual_size', 0)):
                        print(f"   โš ๏ธ {name}: Size mismatch ({details['expected_size']} vs {details['actual_size']})")
            
            return all_components_valid
            
        except Exception as e:
            print(f"โŒ Document download chain test failed: {e}")
            return False

    def test_existing_document_chain(self) -> Dict[str, Any]:
        """Test the complete download chain for an existing uploaded document - NO NEW UPLOADS"""
        print("๏ฟฝ Starting Document Chain Analysis Test")
        print("=" * 50)
        print("๐Ÿšซ NO NEW UPLOADS - Testing existing documents only")
        
        try:
            # Step 1: Get current root.docSchema from server
            print("\n๏ฟฝ Step 1: Fetching current root.docSchema from server...")
            root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
            root_response.raise_for_status()
            root_data = root_response.json()
            current_root_hash = root_data['hash']
            
            print(f"โœ… Root response: {root_response.status_code}")
            print(f"โœ… Root hash: {current_root_hash}")
            print(f"โœ… Generation: {root_data.get('generation')}")
            print(f"๐Ÿ“„ Full root response: {json.dumps(root_data, indent=2)}")
            
            # Step 2: Fetch root.docSchema content
            print(f"\n๐Ÿ“‹ Step 2: Fetching root.docSchema content using hash {current_root_hash}...")
            root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{current_root_hash}")
            root_content_response.raise_for_status()
            root_content = root_content_response.text
            
            print(f"โœ… Root content response: {root_content_response.status_code}")
            print(f"โœ… Root.docSchema size: {len(root_content)} bytes")
            print(f"๐Ÿ“„ Full root.docSchema content:")
            for i, line in enumerate(root_content.strip().split('\n')):
                print(f"   Line {i}: {line}")
            
            # Step 3: Find a PDF document (type 4) in root.docSchema
            print(f"\n๐Ÿ“‹ Step 3: Looking for PDF documents (type 4) in root.docSchema...")
            pdf_documents = []
            
            for line in root_content.strip().split('\n')[1:]:  # Skip version header
                if ':' in line:
                    parts = line.split(':')
                    if len(parts) >= 5:
                        doc_hash = parts[0]
                        doc_uuid = parts[2]
                        node_type = parts[3]
                        size = parts[4]
                        
                        if node_type == '4':  # PDF document
                            pdf_documents.append({
                                'hash': doc_hash,
                                'uuid': doc_uuid,
                                'size': size,
                                'line': line
                            })
            
            print(f"โœ… Found {len(pdf_documents)} PDF documents")
            for i, doc in enumerate(pdf_documents):
                print(f"   PDF {i+1}: UUID {doc['uuid'][:8]}... Hash {doc['hash'][:16]}... Size {doc['size']}")
            
            if not pdf_documents:
                print("โŒ No PDF documents found in root.docSchema")
                return {'success': False, 'error': 'No PDF documents found'}
            
            # Step 4: Pick the first PDF and fetch its docSchema
            target_doc = pdf_documents[0]
            print(f"\n๐Ÿ“‹ Step 4: Analyzing PDF document {target_doc['uuid'][:8]}...")
            print(f"   Target hash: {target_doc['hash']}")
            print(f"   Target size: {target_doc['size']}")
            print(f"   Full entry: {target_doc['line']}")
            
            doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{target_doc['hash']}")
            doc_response.raise_for_status()
            doc_schema_content = doc_response.text
            
            print(f"โœ… Document docSchema response: {doc_response.status_code}")
            print(f"โœ… Document docSchema size: {len(doc_schema_content)} bytes")
            print(f"โœ… Root.docSchema claimed size: {target_doc['size']} bytes")
            print(f"๐Ÿ“Š Size match: {'โœ… YES' if str(len(doc_schema_content)) == target_doc['size'] else 'โŒ NO - MISMATCH!'}")
            
            print(f"๐Ÿ“„ Full document docSchema content:")
            for i, line in enumerate(doc_schema_content.strip().split('\n')):
                print(f"   Line {i}: {line}")
            
            # Step 5: Parse document docSchema and fetch each component
            print(f"\n๐Ÿ“‹ Step 5: Fetching each document component...")
            lines = doc_schema_content.strip().split('\n')
            if len(lines) < 2:
                print(f"โŒ Invalid docSchema format: only {len(lines)} lines")
                return {'success': False, 'error': 'Invalid docSchema format'}
            
            version = lines[0]
            print(f"๐Ÿ“Š DocSchema version: {version}")
            
            components = {}
            
            for i, line in enumerate(lines[1:], 1):
                if ':' in line:
                    parts = line.split(':')
                    if len(parts) >= 5:
                        comp_hash = parts[0]
                        comp_name = parts[2]
                        comp_size = parts[4]
                        
                        print(f"\n   ๐Ÿ” Component {i}: {comp_name}")
                        print(f"       Hash: {comp_hash}")
                        print(f"       Expected size: {comp_size}")
                        
                        try:
                            comp_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{comp_hash}")
                            comp_response.raise_for_status()
                            actual_size = len(comp_response.content)
                            
                            print(f"       โœ… Component response: {comp_response.status_code}")
                            print(f"       โœ… Actual size: {actual_size} bytes")
                            print(f"       ๐Ÿ“Š Size match: {'โœ… YES' if str(actual_size) == comp_size else 'โŒ NO - MISMATCH!'}")
                            
                            # Store component details
                            components[comp_name] = {
                                'hash': comp_hash,
                                'expected_size': comp_size,
                                'actual_size': actual_size,
                                'response_status': comp_response.status_code,
                                'accessible': True,
                                'content': comp_response.content
                            }
                            
                            # Show content preview based on type
                            if comp_name.endswith('.pdf'):
                                print(f"       ๐Ÿ“„ PDF content preview (first 50 bytes): {comp_response.content[:50]}")
                                if comp_response.content.startswith(b'%PDF'):
                                    print(f"       โœ… Valid PDF header detected")
                                else:
                                    print(f"       โŒ Invalid PDF header!")
                            
                            elif comp_name.endswith('.metadata'):
                                try:
                                    metadata_json = json.loads(comp_response.text)
                                    print(f"       โœ… Valid JSON metadata")
                                    print(f"       ๐Ÿ“ Document name: {metadata_json.get('visibleName', 'N/A')}")
                                    print(f"       ๐Ÿ“ Parent UUID: {metadata_json.get('parent', 'root')}")
                                    print(f"       ๐Ÿ—‚๏ธ Document type: {metadata_json.get('type', 'N/A')}")
                                    print(f"       ๐Ÿ“„ Full metadata: {json.dumps(metadata_json, indent=8)}")
                                except Exception as json_e:
                                    print(f"       โŒ Invalid JSON metadata: {json_e}")
                                    print(f"       ๐Ÿ“„ Raw content: {comp_response.text[:200]}...")
                            
                            elif comp_name.endswith('.content'):
                                print(f"       ๐Ÿ“„ Content preview: {comp_response.text[:100]}...")
                            
                            elif comp_name.endswith('.pagedata'):
                                print(f"       ๐Ÿ“„ Pagedata preview: {comp_response.text[:100]}...")
                            
                            else:
                                print(f"       ๐Ÿ“„ Unknown component type, raw preview: {comp_response.content[:50]}")
                        
                        except Exception as e:
                            print(f"       โŒ Component NOT accessible: {e}")
                            components[comp_name] = {
                                'hash': comp_hash,
                                'expected_size': comp_size,
                                'accessible': False,
                                'error': str(e)
                            }
            
            # Step 6: Summary and analysis
            print(f"\n๐Ÿ“‹ Step 6: Complete Analysis Summary")
            print(f"=" * 50)
            accessible_count = sum(1 for c in components.values() if c.get('accessible', False))
            total_count = len(components)
            
            print(f"๐Ÿ“Š Document Analysis Results:")
            print(f"   Target document: {target_doc['uuid']}")
            print(f"   Document hash: {target_doc['hash']}")
            print(f"   Root.docSchema size claim: {target_doc['size']} bytes")
            print(f"   Actual docSchema size: {len(doc_schema_content)} bytes")
            print(f"   Size consistency: {'โœ… GOOD' if str(len(doc_schema_content)) == target_doc['size'] else 'โŒ CORRUPTED'}")
            print(f"   Total components: {total_count}")
            print(f"   Accessible components: {accessible_count}")
            print(f"   Component accessibility: {'โœ… ALL GOOD' if accessible_count == total_count else 'โŒ ISSUES FOUND'}")
            
            print(f"\n๐Ÿ“‹ Component Details:")
            for name, details in components.items():
                status = "โœ… OK" if details.get('accessible') else "โŒ FAIL"
                size_status = ""
                if details.get('accessible'):
                    expected = details.get('expected_size')
                    actual = details.get('actual_size')
                    size_status = f" (size: {'โœ…' if str(actual) == expected else 'โŒ'})"
                print(f"   {status} {name}{size_status}")
                if not details.get('accessible'):
                    print(f"      Error: {details.get('error')}")
            
            success = accessible_count == total_count and str(len(doc_schema_content)) == target_doc['size']
            
            return {
                'success': success,
                'document_uuid': target_doc['uuid'],
                'document_hash': target_doc['hash'],
                'root_size_claim': target_doc['size'],
                'actual_docschema_size': len(doc_schema_content),
                'size_consistent': str(len(doc_schema_content)) == target_doc['size'],
                'total_components': total_count,
                'accessible_components': accessible_count,
                'components': components
            }
            
        except Exception as e:
            print(f"โŒ Document chain analysis failed: {e}")
            return {'success': False, 'error': str(e)}
    
    def comprehensive_cloud_analysis(self) -> Dict[str, Any]:
        """Run comprehensive analysis of cloud state and replica sync process"""
        print("\n๐Ÿ” COMPREHENSIVE CLOUD & REPLICA ANALYSIS")
        print("=" * 60)
        
        analysis_results = {
            'cloud_state': {},
            'replica_state': {},
            'sync_issues': [],
            'missing_documents': []
        }
        
        try:
            # STEP 1: Direct Cloud State Analysis
            print("\n๐Ÿ“ก STEP 1: DIRECT CLOUD STATE ANALYSIS")
            print("-" * 40)
            
            # Get current root state from server
            root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
            root_response.raise_for_status()
            root_data = root_response.json()
            
            print(f"โœ… Root hash: {root_data['hash']}")
            print(f"โœ… Generation: {root_data['generation']}")
            
            # Get root.docSchema content from server
            root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{root_data['hash']}")
            root_content_response.raise_for_status()
            root_content = root_content_response.text
            
            print(f"\n๐Ÿ“„ CLOUD ROOT.DOCSCHEMA ({len(root_content)} bytes):")
            cloud_lines = root_content.strip().split('\n')
            for i, line in enumerate(cloud_lines):
                print(f"   Line {i}: {line}")
            
            # Parse cloud entries
            cloud_documents = []
            cloud_folders = []
            
            for line_num, line in enumerate(cloud_lines[1:], 1):  # Skip version header
                if ':' in line:
                    parts = line.split(':')
                    if len(parts) >= 5:
                        doc_hash = parts[0]
                        doc_uuid = parts[2]
                        node_type = parts[3]
                        size = parts[4]
                        
                        entry_info = {
                            'uuid': doc_uuid,
                            'hash': doc_hash,  
                            'type': node_type,
                            'size': size,
                            'line': line,
                            'line_number': line_num
                        }
                        
                        if node_type in ['1', '2']:  # Folders
                            cloud_folders.append(entry_info)
                        elif node_type in ['3', '4']:  # Documents
                            cloud_documents.append(entry_info)
            
            analysis_results['cloud_state'] = {
                'root_hash': root_data['hash'],
                'generation': root_data['generation'],
                'total_entries': len(cloud_lines) - 1,
                'folders': cloud_folders,
                'documents': cloud_documents
            }
            
            print(f"\n๐Ÿ“Š CLOUD INVENTORY:")
            print(f"   ๐Ÿ“‚ Folders: {len(cloud_folders)}")
            print(f"   ๐Ÿ“„ Documents: {len(cloud_documents)}")
            
            # STEP 2: Fresh Replica Build with Detailed Logging
            print(f"\n๐Ÿ—๏ธ STEP 2: FRESH REPLICA BUILD WITH DETAILED LOGGING")
            print("-" * 40)
            
            from local_replica_v2 import RemarkableReplicaBuilder
            replica_builder = RemarkableReplicaBuilder(self.session)
            
            # Force a complete rebuild
            print("๐Ÿ”„ Forcing complete replica rebuild...")
            replica_builder.build_complete_replica()
            
            # STEP 3: Compare Replica Database vs Cloud State
            print(f"\n๐Ÿ“Š STEP 3: REPLICA DATABASE ANALYSIS")
            print("-" * 40)
            
            # Load the replica database
            database_path = self.base_dir / "remarkable_replica_v2" / "replica_database.json"
            if database_path.exists():
                with open(database_path, 'r') as f:
                    replica_db = json.load(f)
                
                replica_nodes = replica_db.get('nodes', {})
                replica_documents = []
                replica_folders = []
                
                for uuid, node in replica_nodes.items():
                    if node.get('node_type') == 'document':
                        replica_documents.append({
                            'uuid': uuid,
                            'name': node.get('name', 'Unknown'),
                            'hash': node.get('hash', 'Unknown'),
                            'parent': node.get('metadata', {}).get('parent', '')
                        })
                    elif node.get('node_type') == 'folder':
                        replica_folders.append({
                            'uuid': uuid,
                            'name': node.get('name', 'Unknown'),
                            'hash': node.get('hash', 'Unknown')
                        })
                
                analysis_results['replica_state'] = {
                    'total_nodes': len(replica_nodes),
                    'folders': replica_folders,
                    'documents': replica_documents
                }
                
                print(f"๐Ÿ“ REPLICA DATABASE CONTENT:")
                print(f"   ๐Ÿ“‚ Folders: {len(replica_folders)}")
                print(f"   ๐Ÿ“„ Documents: {len(replica_documents)}")
                
                print(f"\n๐Ÿ“‚ REPLICA FOLDERS:")
                for i, folder in enumerate(replica_folders, 1):
                    print(f"   {i}. {folder['name']} (UUID: {folder['uuid'][:8]}...)")
                
                print(f"\n๐Ÿ“„ REPLICA DOCUMENTS:")
                for i, doc in enumerate(replica_documents, 1):
                    parent_info = f" [in folder]" if doc['parent'] else " [root]"
                    print(f"   {i}. {doc['name']} (UUID: {doc['uuid'][:8]}...){parent_info}")
                
            else:
                print("โŒ Replica database not found!")
                analysis_results['sync_issues'].append("Replica database file missing")
            
            # STEP 4: Cross-Reference Analysis
            print(f"\n๐Ÿ” STEP 4: CROSS-REFERENCE ANALYSIS")
            print("-" * 40)
            
            # Check if cloud documents are in replica
            missing_from_replica = []
            for cloud_doc in cloud_documents:
                found_in_replica = any(r['uuid'] == cloud_doc['uuid'] for r in replica_documents)
                if not found_in_replica:
                    missing_from_replica.append(cloud_doc)
                    print(f"โŒ MISSING FROM REPLICA: {cloud_doc['uuid'][:8]}... (type {cloud_doc['type']}, size {cloud_doc['size']})")
            
            # Check if replica documents are in cloud
            missing_from_cloud = []
            for replica_doc in replica_documents:
                found_in_cloud = any(c['uuid'] == replica_doc['uuid'] for c in cloud_documents)
                if not found_in_cloud:
                    missing_from_cloud.append(replica_doc)
                    print(f"โŒ MISSING FROM CLOUD: {replica_doc['name']} (UUID: {replica_doc['uuid'][:8]}...)")
            
            analysis_results['missing_documents'] = {
                'missing_from_replica': missing_from_replica,
                'missing_from_cloud': missing_from_cloud
            }
            
            # STEP 5: Test Document Analysis
            print(f"\n๐ŸŽฏ STEP 5: TEST DOCUMENT ANALYSIS")
            print("-" * 40)
            
            test_uuids = [
                '824225cd-3f6f-4d00-bbbb-54d53ab94cc5',  # Our recent upload
                '2342f4af-3034-45d2-be90-e17ecc9e04d5',  # Invoice PDF  
                '7b3f2f2b-6757-4673-9aa4-636a895415f5'   # Pylontech PDF (should be missing)
            ]
            
            for test_uuid in test_uuids:
                print(f"\n๐Ÿ” Testing document {test_uuid[:8]}...")
                
                # Check in cloud
                in_cloud = any(test_uuid in line for line in cloud_lines)
                print(f"   Cloud: {'โœ… FOUND' if in_cloud else 'โŒ NOT FOUND'}")
                
                # Check in replica
                in_replica = test_uuid in replica_nodes if 'replica_nodes' in locals() else False
                print(f"   Replica: {'โœ… FOUND' if in_replica else 'โŒ NOT FOUND'}")
                
                # If in cloud, fetch and validate components
                if in_cloud:
                    cloud_line = next(line for line in cloud_lines if test_uuid in line)
                    parts = cloud_line.split(':')
                    doc_hash = parts[0]
                    
                    try:
                        doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
                        doc_response.raise_for_status()
                        print(f"   DocSchema: โœ… ACCESSIBLE ({len(doc_response.text)} bytes)")
                    except Exception as e:
                        print(f"   DocSchema: โŒ NOT ACCESSIBLE - {e}")
            
            print(f"\n๐Ÿ“Š ANALYSIS SUMMARY:")
            print(f"   Cloud entries: {len(cloud_documents)} docs, {len(cloud_folders)} folders")
            print(f"   Replica entries: {len(replica_documents)} docs, {len(replica_folders)} folders")
            print(f"   Missing from replica: {len(missing_from_replica)}")
            print(f"   Missing from cloud: {len(missing_from_cloud)}")
            
            return analysis_results
            
        except Exception as e:
            print(f"โŒ Analysis failed: {e}")
            analysis_results['error'] = str(e)
            return analysis_results

    def create_demo_upload_scenario(self):
        """Create a comprehensive demo scenario"""
        print("\n๐ŸŽฌ Creating Demo Upload Scenario")
        print("This will demonstrate a complete workflow:")
        print("1. Create a new notebook")
        print("2. Upload a PDF document") 
        print("3. Organize them in folders")
        print("4. Edit metadata")
        print("5. Verify everything synced correctly")
        
        # Create demo notebook
        demo_notebook = f"Demo_Notebook_{int(time.time())}"
        self.uploader.create_notebook(demo_notebook, template="Lines Medium")
        
        # Create and upload demo PDF
        demo_pdf = self.test_create_test_pdf()
        demo_pdf_name = f"Demo_PDF_{int(time.time())}"
        self.uploader.upload_pdf_document(str(demo_pdf), demo_pdf_name)
        
        print("โœ… Demo scenario created successfully!")
        print("Check your reMarkable device to see the new files.")

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, enable_raw_logging)

Purpose: Internal method: init

Parameters:

  • enable_raw_logging: Type: bool

Returns: None

save_raw_logs(self) -> Path

Purpose: Save captured raw HTTP logs to file for comparison with real app logs

Returns: Returns Path

log_test(self, test_name, success, details)

Purpose: Log test result

Parameters:

  • test_name: Type: str
  • success: Type: bool
  • details: Type: str

Returns: None

test_edit_document_name(self) -> bool

Purpose: Test 1: Edit existing document name

Returns: Returns bool

test_create_test_pdf(self) -> Path

Purpose: Create a test PDF for upload testing

Returns: Returns Path

test_upload_new_pdf(self, parent_uuid) -> bool

Purpose: Test: Upload new PDF document to specified folder (or root if no parent_uuid)

Parameters:

  • parent_uuid: Type: str

Returns: Returns bool

test_create_new_notebook(self) -> bool

Purpose: Test 3: Create new notebook

Returns: Returns bool

test_move_document(self) -> bool

Purpose: Test 4: Move document to different folder

Returns: Returns bool

test_hash_consistency(self) -> bool

Purpose: Test 5: Verify hash consistency after uploads

Returns: Returns bool

validate_uploaded_document(self, document_uuid) -> bool

Purpose: Validate that an uploaded document has all required components accessible

Parameters:

  • document_uuid: Type: str

Returns: Returns bool

analyze_root_docschema_sizes(self) -> bool

Purpose: Analyze the sizes in root.docSchema to identify patterns and issues

Returns: Returns bool

test_document_download_chain(self, document_uuid) -> bool

Purpose: Follow the complete download chain for a document to identify where it fails

Parameters:

  • document_uuid: Type: str

Returns: Returns bool

test_existing_document_chain(self) -> Dict[str, Any]

Purpose: Test the complete download chain for an existing uploaded document - NO NEW UPLOADS

Returns: Returns Dict[str, Any]

comprehensive_cloud_analysis(self) -> Dict[str, Any]

Purpose: Run comprehensive analysis of cloud state and replica sync process

Returns: Returns Dict[str, Any]

create_demo_upload_scenario(self)

Purpose: Create a comprehensive demo scenario

Returns: None

Required Imports

import os
import json
import time
from pathlib import Path
from typing import Dict

Usage Example

# Example usage:
# result = RemarkableUploadTests(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class SimplePDFUploadTest 75.3% similar

    A test class for validating PDF upload functionality to reMarkable cloud, including raw content upload and complete document creation with metadata.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/test_simple_pdf_upload.py
  • class FixedUploadTest 73.7% similar

    A test class that simulates document upload to reMarkable cloud with specific fixes applied to match the real reMarkable desktop app behavior.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/fixed_upload_test.py
  • function main_v100 72.0% similar

    Tests uploading a PDF document to a specific folder ('Myfolder') on a reMarkable device and verifies the upload by syncing and checking folder contents.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/test_folder_upload.py
  • function main_v6 71.2% similar

    Integration test function that validates the fixed upload implementation for reMarkable cloud sync by creating a test PDF document, uploading it with corrected metadata patterns, and verifying its successful appearance in the reMarkable ecosystem.

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/test_fixed_upload.py
  • class RemarkableUploadManager_v1 70.8% similar

    Manages uploads to reMarkable cloud

    From: /tf/active/vicechatdev/e-ink-llm/cloudtest/upload_manager_old.py
โ† Back to Browse