class RemarkableUploadTests
Test suite for reMarkable upload functionality
/tf/active/vicechatdev/e-ink-llm/cloudtest/test_uploads.py
107 - 1272
moderate
Purpose
Test suite for reMarkable upload functionality
Source Code
class RemarkableUploadTests:
"""Test suite for reMarkable upload functionality"""
def __init__(self, enable_raw_logging: bool = True):
self.base_dir = Path(__file__).parent
self.test_results = []
self.raw_logging = enable_raw_logging
if self.raw_logging:
print("๐ Raw HTTP request logging ENABLED")
else:
print("๐ Raw HTTP request logging DISABLED")
# Load auth session
from auth import RemarkableAuth
auth = RemarkableAuth()
self.session = auth.get_authenticated_session()
if not self.session:
raise RuntimeError("Failed to authenticate with reMarkable")
# Load upload manager
from upload_manager import RemarkableUploadManager
database_path = self.base_dir / "remarkable_replica_v2" / "replica_database.json"
self.uploader = RemarkableUploadManager(self.session, database_path)
print("๐งช reMarkable Upload Test Suite Initialized")
def save_raw_logs(self) -> Path:
"""Save captured raw HTTP logs to file for comparison with real app logs"""
global raw_logs
if not raw_logs:
print("๐ No raw logs to save")
return None
logs_dir = self.base_dir / "test_results" / "raw_logs"
logs_dir.mkdir(parents=True, exist_ok=True)
timestamp = int(time.time())
log_file = logs_dir / f"raw_requests_{timestamp}.json"
# Save detailed logs
with open(log_file, 'w') as f:
json.dump(raw_logs, f, indent=2, default=str)
# Also create a simplified log similar to real app logs
simple_log_file = logs_dir / f"simple_requests_{timestamp}.txt"
with open(simple_log_file, 'w') as f:
f.write("=== reMarkable Upload Test - Raw Request Log ===\n\n")
for i, log in enumerate(raw_logs, 1):
f.write(f"Request #{i}: {log['method']} {log['url']}\n")
f.write(f"Status: {log['response_status']}\n")
# Key headers
if 'rm-filename' in log['headers']:
f.write(f"rm-filename: {log['headers']['rm-filename']}\n")
if 'x-goog-hash' in log['headers']:
f.write(f"x-goog-hash: {log['headers']['x-goog-hash']}\n")
if 'Content-Type' in log['headers']:
f.write(f"Content-Type: {log['headers']['Content-Type']}\n")
# Body information
f.write(f"Body size: {log['body_size']} bytes\n")
# Include full text body if available
if log.get('body_type') == 'text' and log.get('body_text'):
f.write(f"Body content:\n{log['body_text']}\n")
elif log.get('body_type') == 'binary':
f.write(f"Body: {log['body_text']}\n")
elif log.get('json_body'):
f.write(f"JSON Body: {json.dumps(log['json_body'], indent=2)}\n")
# Response information
if log.get('response_text'):
f.write(f"Response: {log['response_text']}\n")
f.write("-" * 50 + "\n\n")
print(f"๐ Raw logs saved:")
print(f" Detailed: {log_file}")
print(f" Simple: {simple_log_file}")
print(f" Total requests: {len(raw_logs)}")
return log_file
def log_test(self, test_name: str, success: bool, details: str = ""):
"""Log test result"""
status = "โ
PASS" if success else "โ FAIL"
print(f"{status} {test_name}")
if details:
print(f" {details}")
self.test_results.append({
'test': test_name,
'success': success,
'details': details,
'timestamp': time.time()
})
def test_edit_document_name(self) -> bool:
"""Test 1: Edit existing document name"""
try:
print("\n๐ง Test 1: Edit Document Name")
# Find a document to edit
database = self.uploader.database
document_uuid = None
original_name = None
for uuid, node in database['nodes'].items():
if node['node_type'] == 'document' and 'pdf' not in node['name'].lower():
document_uuid = uuid
original_name = node['name']
break
if not document_uuid:
self.log_test("Edit Document Name", False, "No suitable document found")
return False
# Generate new name
new_name = f"TEST_RENAMED_{int(time.time())}"
print(f"Renaming '{original_name}' to '{new_name}'")
# Perform edit
success = self.uploader.edit_document_metadata(document_uuid, new_name=new_name)
if success:
# Verify in database
updated_node = self.uploader.database['nodes'][document_uuid]
if updated_node['metadata']['visibleName'] == new_name:
self.log_test("Edit Document Name", True, f"Renamed to: {new_name}")
# Rename back to original
self.uploader.edit_document_metadata(document_uuid, new_name=original_name)
return True
else:
self.log_test("Edit Document Name", False, "Database not updated")
return False
else:
self.log_test("Edit Document Name", False, "Upload failed")
return False
except Exception as e:
self.log_test("Edit Document Name", False, f"Exception: {e}")
return False
def test_create_test_pdf(self) -> Path:
"""Create a test PDF for upload testing"""
try:
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter
test_pdf_path = self.base_dir / "test_uploads" / "test_document.pdf"
test_pdf_path.parent.mkdir(exist_ok=True)
# Create simple PDF
c = canvas.Canvas(str(test_pdf_path), pagesize=letter)
c.drawString(100, 750, f"reMarkable Upload Test Document")
c.drawString(100, 720, f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}")
c.drawString(100, 690, f"Test UUID: {uuid.uuid4()}")
c.drawString(100, 660, "This is a test document for upload functionality.")
c.showPage()
c.save()
return test_pdf_path
except ImportError:
# Create simple text-based PDF content
test_pdf_path = self.base_dir / "test_uploads" / "test_document.txt"
test_pdf_path.parent.mkdir(exist_ok=True)
with open(test_pdf_path, 'w') as f:
f.write(f"reMarkable Upload Test Document\n")
f.write(f"Generated: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"Test UUID: {uuid.uuid4()}\n")
f.write("This is a test document for upload functionality.\n")
print("โ ๏ธ Using text file instead of PDF (reportlab not available)")
return test_pdf_path
def test_upload_new_pdf(self, parent_uuid: str = "") -> bool:
"""Test: Upload new PDF document to specified folder (or root if no parent_uuid)"""
try:
target_location = "specified folder" if parent_uuid else "root folder"
folder_info = f" (UUID: {parent_uuid})" if parent_uuid else ""
print(f"\n๐ Test: Upload New PDF Document to {target_location}{folder_info}")
# Create test PDF
test_pdf = self.test_create_test_pdf()
test_name = f"UploadTest_{int(time.time())}"
print(f"Uploading: {test_pdf} as '{test_name}' to {target_location}")
# Get initial document count from current database (no pre-sync needed)
initial_count = len([n for n in self.uploader.database['nodes'].values()
if n['node_type'] == 'document'])
print(f"๐ Initial document count: {initial_count}")
# Upload document with parent parameter - this should follow the complete sequence:
# 1-5: Document components + docSchema
# 6: root.docSchema update
# 7: roothash update
# 8: Replica sync to verify
success = self.uploader.upload_pdf_document(str(test_pdf), test_name, parent_uuid)
if success:
print(f"โ
Upload completed - document should be visible in {target_location}")
# The upload_manager already runs replica sync, so no need to duplicate it
# Just verify the result
new_count = len([n for n in self.uploader.database['nodes'].values()
if n['node_type'] == 'document'])
print(f"๐ New document count: {new_count}")
# Find the new document
new_doc = None
for node in self.uploader.database['nodes'].values():
if node.get('name') == test_name:
new_doc = node
break
if new_doc:
# Verify parent location if specified
actual_parent = new_doc.get('metadata', {}).get('parent', '')
expected_parent = parent_uuid if parent_uuid else ''
if actual_parent == expected_parent:
location_status = f"โ
in correct location ({target_location})"
else:
location_status = f"โ wrong location (expected: {expected_parent}, actual: {actual_parent})"
self.log_test("Upload New PDF", True,
f"Document created: {test_name} (UUID: {new_doc['uuid'][:8]}...) {location_status}")
return True
elif new_count > initial_count:
self.log_test("Upload New PDF", True,
f"Document uploaded successfully (count increased by {new_count - initial_count})")
return True
else:
self.log_test("Upload New PDF", False, "Document not found and count unchanged")
return False
else:
self.log_test("Upload New PDF", False, "Upload failed")
return False
except Exception as e:
self.log_test("Upload New PDF", False, f"Exception: {e}")
return False
def test_create_new_notebook(self) -> bool:
"""Test 3: Create new notebook"""
try:
print("\n๐ Test 3: Create New Notebook")
notebook_name = f"TestNotebook_{int(time.time())}"
print(f"Creating notebook: '{notebook_name}'")
# Get initial document count
initial_count = len([n for n in self.uploader.database['nodes'].values()
if n['node_type'] == 'document'])
# Create notebook
success = self.uploader.create_notebook(notebook_name)
if success:
# Verify in database
new_count = len([n for n in self.uploader.database['nodes'].values()
if n['node_type'] == 'document'])
if new_count > initial_count:
# Find the new notebook
new_notebook = None
for node in self.uploader.database['nodes'].values():
if node['name'] == notebook_name:
new_notebook = node
break
if new_notebook:
# Verify it has content structure
has_content = new_notebook['component_hashes'].get('content') is not None
has_rm_files = len(new_notebook['component_hashes'].get('rm_files', [])) > 0
if has_content and has_rm_files:
self.log_test("Create New Notebook", True,
f"Notebook created: {notebook_name} with content + RM files")
return True
else:
self.log_test("Create New Notebook", False,
"Notebook missing content structure")
return False
else:
self.log_test("Create New Notebook", False, "Notebook not found in database")
return False
else:
self.log_test("Create New Notebook", False, "Document count unchanged")
return False
else:
self.log_test("Create New Notebook", False, "Creation failed")
return False
except Exception as e:
self.log_test("Create New Notebook", False, f"Exception: {e}")
return False
def test_move_document(self) -> bool:
"""Test 4: Move document to different folder"""
try:
print("\n๐ Test 4: Move Document")
database = self.uploader.database
# Find a document and a folder
document_uuid = None
folder_uuid = None
original_parent = None
for uuid, node in database['nodes'].items():
if node['node_type'] == 'document' and document_uuid is None:
document_uuid = uuid
original_parent = node.get('parent_uuid', '')
elif node['node_type'] == 'folder' and folder_uuid is None:
folder_uuid = uuid
if document_uuid and folder_uuid:
break
if not document_uuid:
self.log_test("Move Document", False, "No document found")
return False
if not folder_uuid:
self.log_test("Move Document", False, "No folder found")
return False
document_name = database['nodes'][document_uuid]['name']
folder_name = database['nodes'][folder_uuid]['name']
print(f"Moving '{document_name}' to folder '{folder_name}'")
# Move document
success = self.uploader.edit_document_metadata(document_uuid, new_parent=folder_uuid)
if success:
# Verify move
updated_node = self.uploader.database['nodes'][document_uuid]
if updated_node['metadata']['parent'] == folder_uuid:
self.log_test("Move Document", True,
f"Moved '{document_name}' to '{folder_name}'")
# Move back to original location
self.uploader.edit_document_metadata(document_uuid, new_parent=original_parent)
return True
else:
self.log_test("Move Document", False, "Parent not updated in database")
return False
else:
self.log_test("Move Document", False, "Move operation failed")
return False
except Exception as e:
self.log_test("Move Document", False, f"Exception: {e}")
return False
def test_hash_consistency(self) -> bool:
"""Test 5: Verify hash consistency after uploads"""
try:
print("\n๐ Test 5: Hash Consistency Check")
database = self.uploader.database
hash_registry = database.get('hash_registry', {})
# Check for hash collisions
hash_types = {}
for hash_val, info in hash_registry.items():
hash_type = info.get('type', 'unknown')
if hash_type not in hash_types:
hash_types[hash_type] = []
hash_types[hash_type].append(hash_val)
# Verify recent uploads have proper hash entries
recent_uploads = [result for result in self.test_results if result['success']]
consistency_issues = 0
# Check node consistency
for uuid, node in database['nodes'].items():
node_hash = node.get('hash')
if node_hash and node_hash not in hash_registry:
consistency_issues += 1
print(f"โ ๏ธ Node {uuid[:8]}... has unregistered hash {node_hash[:16]}...")
# Check component hashes
comp_hashes = node.get('component_hashes', {})
for comp_type, comp_hash in comp_hashes.items():
if comp_hash and isinstance(comp_hash, str) and comp_hash not in hash_registry:
consistency_issues += 1
print(f"โ ๏ธ Component {comp_type} has unregistered hash {comp_hash[:16]}...")
if consistency_issues == 0:
self.log_test("Hash Consistency", True,
f"All hashes properly registered ({len(hash_registry)} total)")
return True
else:
self.log_test("Hash Consistency", False,
f"{consistency_issues} hash consistency issues found")
return False
except Exception as e:
self.log_test("Hash Consistency", False, f"Exception: {e}")
return False
def validate_uploaded_document(self, document_uuid: str) -> bool:
"""Validate that an uploaded document has all required components accessible"""
try:
print(f"\n๐ Validating uploaded document: {document_uuid[:8]}...")
# Find the document in our database
doc_node = None
for uuid, node in self.uploader.database['nodes'].items():
if uuid == document_uuid:
doc_node = node
break
if not doc_node:
print(f"โ Document {document_uuid[:8]}... not found in local database")
return False
print(f"๐ Document found: {doc_node['name']}")
print(f"๐ Document hash: {doc_node['hash']}")
# Try to fetch the document's docSchema from server
doc_hash = doc_node['hash']
try:
doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
doc_response.raise_for_status()
print(f"โ
Document docSchema accessible from server ({len(doc_response.text)} bytes)")
print(f"๐ DocSchema content preview: {doc_response.text[:200]}...")
# Parse the docSchema and validate each component
lines = doc_response.text.strip().split('\n')
if len(lines) < 2:
print(f"โ Invalid docSchema format: too few lines ({len(lines)})")
return False
version = lines[0]
print(f"๐ DocSchema version: {version}")
component_count = 0
missing_components = []
for line in lines[1:]:
if ':' in line:
parts = line.split(':')
if len(parts) >= 3:
comp_hash = parts[0]
comp_name = parts[2]
component_count += 1
# Try to fetch this component
try:
comp_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{comp_hash}")
comp_response.raise_for_status()
print(f"โ
Component {comp_name}: accessible ({len(comp_response.content)} bytes)")
except Exception as e:
print(f"โ Component {comp_name}: NOT accessible - {e}")
missing_components.append(comp_name)
print(f"๐ Total components: {component_count}")
print(f"โ Missing components: {len(missing_components)}")
if missing_components:
print(f"โ ๏ธ Missing: {missing_components}")
return False
else:
print(f"โ
All document components are accessible")
return True
except Exception as e:
print(f"โ Cannot fetch document docSchema: {e}")
return False
except Exception as e:
print(f"โ Document validation failed: {e}")
return False
def analyze_root_docschema_sizes(self) -> bool:
"""Analyze the sizes in root.docSchema to identify patterns and issues"""
try:
print(f"\n๐ Analyzing root.docSchema sizes...")
# Get current root.docSchema
root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
root_response.raise_for_status()
root_data = root_response.json()
current_root_hash = root_data['hash']
root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{current_root_hash}")
root_content_response.raise_for_status()
root_content = root_content_response.text
# Parse entries and analyze sizes
lines = root_content.strip().split('\n')[1:] # Skip version header
size_analysis = {}
entries_by_size = {}
print(f"๐ Analyzing {len(lines)} entries...")
for line in lines:
if ':' in line:
parts = line.split(':')
if len(parts) >= 5:
doc_hash = parts[0]
doc_uuid = parts[2]
node_type = parts[3]
size = parts[4]
# Group by size
if size not in entries_by_size:
entries_by_size[size] = []
entries_by_size[size].append({
'uuid': doc_uuid,
'hash': doc_hash,
'type': node_type,
'line': line
})
# Report size distribution
print(f"\n๐ Size Distribution Analysis:")
for size, entries in sorted(entries_by_size.items(), key=lambda x: len(x[1]), reverse=True):
count = len(entries)
print(f" Size {size}: {count} documents")
if count > 1:
print(f" โ ๏ธ Multiple documents with identical size:")
for entry in entries[:5]: # Show first 5
print(f" - {entry['uuid'][:8]}... (type {entry['type']})")
if len(entries) > 5:
print(f" ... and {len(entries) - 5} more")
# Check if our uploaded documents have suspicious sizes
suspicious_sizes = ['2247', '2246'] # The sizes we noticed
print(f"\n๐ Checking for suspicious identical sizes...")
for size in suspicious_sizes:
if size in entries_by_size and len(entries_by_size[size]) > 1:
print(f"โ ๏ธ Found {len(entries_by_size[size])} documents with size {size}:")
# Test each document with this size
for entry in entries_by_size[size]:
print(f"\n Testing document {entry['uuid'][:8]}...")
try:
# Fetch the document's docSchema
doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{entry['hash']}")
doc_response.raise_for_status()
actual_docschema_size = len(doc_response.text)
print(f" DocSchema actual size: {actual_docschema_size} bytes")
print(f" Root.docSchema claims: {size} bytes")
print(f" Match: {'โ
' if str(actual_docschema_size) == size else 'โ'}")
if str(actual_docschema_size) != size:
print(f" โ ๏ธ SIZE MISMATCH DETECTED!")
print(f" ๐ DocSchema content preview: {doc_response.text[:100]}...")
except Exception as e:
print(f" โ Cannot fetch docSchema: {e}")
return True
except Exception as e:
print(f"โ Root.docSchema size analysis failed: {e}")
return False
def test_document_download_chain(self, document_uuid: str) -> bool:
"""Follow the complete download chain for a document to identify where it fails"""
try:
print(f"\n๐ Testing Document Download Chain: {document_uuid[:8]}...")
# Step 1: Get current root.docSchema and find our document
print("๐ Step 1: Getting root.docSchema from server...")
root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
root_response.raise_for_status()
root_data = root_response.json()
current_root_hash = root_data['hash']
print(f"โ
Root hash: {current_root_hash}")
print(f"โ
Generation: {root_data.get('generation')}")
# Step 2: Fetch root.docSchema content
print("๐ Step 2: Fetching root.docSchema content...")
root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{current_root_hash}")
root_content_response.raise_for_status()
root_content = root_content_response.text
print(f"โ
Root.docSchema size: {len(root_content)} bytes")
print(f"๐ Root.docSchema content preview:")
for i, line in enumerate(root_content.strip().split('\n')[:10]):
print(f" {i}: {line}")
# Step 3: Find our document in root.docSchema
print(f"๐ Step 3: Looking for document {document_uuid} in root.docSchema...")
doc_entry = None
doc_hash = None
doc_size = None
for line in root_content.strip().split('\n')[1:]: # Skip version header
if document_uuid in line:
doc_entry = line
parts = line.split(':')
if len(parts) >= 5:
doc_hash = parts[0]
doc_size = parts[4]
break
if not doc_entry:
print(f"โ Document {document_uuid} NOT found in root.docSchema")
return False
print(f"โ
Document found in root.docSchema:")
print(f" Entry: {doc_entry}")
print(f" Hash: {doc_hash}")
print(f" Size: {doc_size}")
# Step 4: Fetch document's docSchema
print(f"๐ Step 4: Fetching document's docSchema...")
doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
doc_response.raise_for_status()
doc_schema_content = doc_response.text
print(f"โ
Document docSchema size: {len(doc_schema_content)} bytes")
print(f"๐ Document docSchema content:")
for i, line in enumerate(doc_schema_content.strip().split('\n')):
print(f" {i}: {line}")
# Step 5: Parse docSchema and test each component
print(f"๐ Step 5: Testing each document component...")
lines = doc_schema_content.strip().split('\n')
if len(lines) < 2:
print(f"โ Invalid docSchema format: only {len(lines)} lines")
return False
version = lines[0]
print(f"๐ DocSchema version: {version}")
all_components_valid = True
component_details = {}
for i, line in enumerate(lines[1:], 1):
if ':' in line:
parts = line.split(':')
if len(parts) >= 5:
comp_hash = parts[0]
comp_name = parts[2]
comp_size = parts[4]
print(f"\n ๐ Testing component {i}: {comp_name}")
print(f" Hash: {comp_hash}")
print(f" Expected size: {comp_size}")
try:
comp_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{comp_hash}")
comp_response.raise_for_status()
actual_size = len(comp_response.content)
print(f" โ
Component accessible")
print(f" ๐ Actual size: {actual_size} bytes")
print(f" ๐ Size match: {'โ
' if str(actual_size) == comp_size else 'โ'}")
# Store component details
component_details[comp_name] = {
'hash': comp_hash,
'expected_size': comp_size,
'actual_size': actual_size,
'accessible': True,
'content_preview': comp_response.content[:100] if comp_response.content else b''
}
# Special handling for PDF content
if comp_name.endswith('.pdf'):
print(f" ๐ PDF content preview: {comp_response.content[:50]}...")
if comp_response.content.startswith(b'%PDF'):
print(f" โ
Valid PDF header detected")
else:
print(f" โ Invalid PDF header - content: {comp_response.content[:20]}")
all_components_valid = False
# Special handling for metadata
elif comp_name.endswith('.metadata'):
try:
metadata_json = json.loads(comp_response.text)
print(f" โ
Valid JSON metadata")
print(f" ๐ Name: {metadata_json.get('visibleName', 'N/A')}")
print(f" ๐ Parent: {metadata_json.get('parent', 'root')}")
except:
print(f" โ Invalid JSON metadata")
all_components_valid = False
except Exception as e:
print(f" โ Component NOT accessible: {e}")
component_details[comp_name] = {
'hash': comp_hash,
'expected_size': comp_size,
'accessible': False,
'error': str(e)
}
all_components_valid = False
# Step 6: Summary
print(f"\n๐ Step 6: Download Chain Summary")
print(f" Root.docSchema: โ
Accessible")
print(f" Document entry: โ
Found")
print(f" Document docSchema: โ
Accessible")
print(f" All components: {'โ
Valid' if all_components_valid else 'โ Issues found'}")
if not all_components_valid:
print(f"\nโ ๏ธ Component Issues Detected:")
for name, details in component_details.items():
if not details.get('accessible', False):
print(f" โ {name}: {details.get('error', 'Not accessible')}")
elif details.get('expected_size') != str(details.get('actual_size', 0)):
print(f" โ ๏ธ {name}: Size mismatch ({details['expected_size']} vs {details['actual_size']})")
return all_components_valid
except Exception as e:
print(f"โ Document download chain test failed: {e}")
return False
def test_existing_document_chain(self) -> Dict[str, Any]:
"""Test the complete download chain for an existing uploaded document - NO NEW UPLOADS"""
print("๏ฟฝ Starting Document Chain Analysis Test")
print("=" * 50)
print("๐ซ NO NEW UPLOADS - Testing existing documents only")
try:
# Step 1: Get current root.docSchema from server
print("\n๏ฟฝ Step 1: Fetching current root.docSchema from server...")
root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
root_response.raise_for_status()
root_data = root_response.json()
current_root_hash = root_data['hash']
print(f"โ
Root response: {root_response.status_code}")
print(f"โ
Root hash: {current_root_hash}")
print(f"โ
Generation: {root_data.get('generation')}")
print(f"๐ Full root response: {json.dumps(root_data, indent=2)}")
# Step 2: Fetch root.docSchema content
print(f"\n๐ Step 2: Fetching root.docSchema content using hash {current_root_hash}...")
root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{current_root_hash}")
root_content_response.raise_for_status()
root_content = root_content_response.text
print(f"โ
Root content response: {root_content_response.status_code}")
print(f"โ
Root.docSchema size: {len(root_content)} bytes")
print(f"๐ Full root.docSchema content:")
for i, line in enumerate(root_content.strip().split('\n')):
print(f" Line {i}: {line}")
# Step 3: Find a PDF document (type 4) in root.docSchema
print(f"\n๐ Step 3: Looking for PDF documents (type 4) in root.docSchema...")
pdf_documents = []
for line in root_content.strip().split('\n')[1:]: # Skip version header
if ':' in line:
parts = line.split(':')
if len(parts) >= 5:
doc_hash = parts[0]
doc_uuid = parts[2]
node_type = parts[3]
size = parts[4]
if node_type == '4': # PDF document
pdf_documents.append({
'hash': doc_hash,
'uuid': doc_uuid,
'size': size,
'line': line
})
print(f"โ
Found {len(pdf_documents)} PDF documents")
for i, doc in enumerate(pdf_documents):
print(f" PDF {i+1}: UUID {doc['uuid'][:8]}... Hash {doc['hash'][:16]}... Size {doc['size']}")
if not pdf_documents:
print("โ No PDF documents found in root.docSchema")
return {'success': False, 'error': 'No PDF documents found'}
# Step 4: Pick the first PDF and fetch its docSchema
target_doc = pdf_documents[0]
print(f"\n๐ Step 4: Analyzing PDF document {target_doc['uuid'][:8]}...")
print(f" Target hash: {target_doc['hash']}")
print(f" Target size: {target_doc['size']}")
print(f" Full entry: {target_doc['line']}")
doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{target_doc['hash']}")
doc_response.raise_for_status()
doc_schema_content = doc_response.text
print(f"โ
Document docSchema response: {doc_response.status_code}")
print(f"โ
Document docSchema size: {len(doc_schema_content)} bytes")
print(f"โ
Root.docSchema claimed size: {target_doc['size']} bytes")
print(f"๐ Size match: {'โ
YES' if str(len(doc_schema_content)) == target_doc['size'] else 'โ NO - MISMATCH!'}")
print(f"๐ Full document docSchema content:")
for i, line in enumerate(doc_schema_content.strip().split('\n')):
print(f" Line {i}: {line}")
# Step 5: Parse document docSchema and fetch each component
print(f"\n๐ Step 5: Fetching each document component...")
lines = doc_schema_content.strip().split('\n')
if len(lines) < 2:
print(f"โ Invalid docSchema format: only {len(lines)} lines")
return {'success': False, 'error': 'Invalid docSchema format'}
version = lines[0]
print(f"๐ DocSchema version: {version}")
components = {}
for i, line in enumerate(lines[1:], 1):
if ':' in line:
parts = line.split(':')
if len(parts) >= 5:
comp_hash = parts[0]
comp_name = parts[2]
comp_size = parts[4]
print(f"\n ๐ Component {i}: {comp_name}")
print(f" Hash: {comp_hash}")
print(f" Expected size: {comp_size}")
try:
comp_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{comp_hash}")
comp_response.raise_for_status()
actual_size = len(comp_response.content)
print(f" โ
Component response: {comp_response.status_code}")
print(f" โ
Actual size: {actual_size} bytes")
print(f" ๐ Size match: {'โ
YES' if str(actual_size) == comp_size else 'โ NO - MISMATCH!'}")
# Store component details
components[comp_name] = {
'hash': comp_hash,
'expected_size': comp_size,
'actual_size': actual_size,
'response_status': comp_response.status_code,
'accessible': True,
'content': comp_response.content
}
# Show content preview based on type
if comp_name.endswith('.pdf'):
print(f" ๐ PDF content preview (first 50 bytes): {comp_response.content[:50]}")
if comp_response.content.startswith(b'%PDF'):
print(f" โ
Valid PDF header detected")
else:
print(f" โ Invalid PDF header!")
elif comp_name.endswith('.metadata'):
try:
metadata_json = json.loads(comp_response.text)
print(f" โ
Valid JSON metadata")
print(f" ๐ Document name: {metadata_json.get('visibleName', 'N/A')}")
print(f" ๐ Parent UUID: {metadata_json.get('parent', 'root')}")
print(f" ๐๏ธ Document type: {metadata_json.get('type', 'N/A')}")
print(f" ๐ Full metadata: {json.dumps(metadata_json, indent=8)}")
except Exception as json_e:
print(f" โ Invalid JSON metadata: {json_e}")
print(f" ๐ Raw content: {comp_response.text[:200]}...")
elif comp_name.endswith('.content'):
print(f" ๐ Content preview: {comp_response.text[:100]}...")
elif comp_name.endswith('.pagedata'):
print(f" ๐ Pagedata preview: {comp_response.text[:100]}...")
else:
print(f" ๐ Unknown component type, raw preview: {comp_response.content[:50]}")
except Exception as e:
print(f" โ Component NOT accessible: {e}")
components[comp_name] = {
'hash': comp_hash,
'expected_size': comp_size,
'accessible': False,
'error': str(e)
}
# Step 6: Summary and analysis
print(f"\n๐ Step 6: Complete Analysis Summary")
print(f"=" * 50)
accessible_count = sum(1 for c in components.values() if c.get('accessible', False))
total_count = len(components)
print(f"๐ Document Analysis Results:")
print(f" Target document: {target_doc['uuid']}")
print(f" Document hash: {target_doc['hash']}")
print(f" Root.docSchema size claim: {target_doc['size']} bytes")
print(f" Actual docSchema size: {len(doc_schema_content)} bytes")
print(f" Size consistency: {'โ
GOOD' if str(len(doc_schema_content)) == target_doc['size'] else 'โ CORRUPTED'}")
print(f" Total components: {total_count}")
print(f" Accessible components: {accessible_count}")
print(f" Component accessibility: {'โ
ALL GOOD' if accessible_count == total_count else 'โ ISSUES FOUND'}")
print(f"\n๐ Component Details:")
for name, details in components.items():
status = "โ
OK" if details.get('accessible') else "โ FAIL"
size_status = ""
if details.get('accessible'):
expected = details.get('expected_size')
actual = details.get('actual_size')
size_status = f" (size: {'โ
' if str(actual) == expected else 'โ'})"
print(f" {status} {name}{size_status}")
if not details.get('accessible'):
print(f" Error: {details.get('error')}")
success = accessible_count == total_count and str(len(doc_schema_content)) == target_doc['size']
return {
'success': success,
'document_uuid': target_doc['uuid'],
'document_hash': target_doc['hash'],
'root_size_claim': target_doc['size'],
'actual_docschema_size': len(doc_schema_content),
'size_consistent': str(len(doc_schema_content)) == target_doc['size'],
'total_components': total_count,
'accessible_components': accessible_count,
'components': components
}
except Exception as e:
print(f"โ Document chain analysis failed: {e}")
return {'success': False, 'error': str(e)}
def comprehensive_cloud_analysis(self) -> Dict[str, Any]:
"""Run comprehensive analysis of cloud state and replica sync process"""
print("\n๐ COMPREHENSIVE CLOUD & REPLICA ANALYSIS")
print("=" * 60)
analysis_results = {
'cloud_state': {},
'replica_state': {},
'sync_issues': [],
'missing_documents': []
}
try:
# STEP 1: Direct Cloud State Analysis
print("\n๐ก STEP 1: DIRECT CLOUD STATE ANALYSIS")
print("-" * 40)
# Get current root state from server
root_response = self.session.get("https://eu.tectonic.remarkable.com/sync/v4/root")
root_response.raise_for_status()
root_data = root_response.json()
print(f"โ
Root hash: {root_data['hash']}")
print(f"โ
Generation: {root_data['generation']}")
# Get root.docSchema content from server
root_content_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{root_data['hash']}")
root_content_response.raise_for_status()
root_content = root_content_response.text
print(f"\n๐ CLOUD ROOT.DOCSCHEMA ({len(root_content)} bytes):")
cloud_lines = root_content.strip().split('\n')
for i, line in enumerate(cloud_lines):
print(f" Line {i}: {line}")
# Parse cloud entries
cloud_documents = []
cloud_folders = []
for line_num, line in enumerate(cloud_lines[1:], 1): # Skip version header
if ':' in line:
parts = line.split(':')
if len(parts) >= 5:
doc_hash = parts[0]
doc_uuid = parts[2]
node_type = parts[3]
size = parts[4]
entry_info = {
'uuid': doc_uuid,
'hash': doc_hash,
'type': node_type,
'size': size,
'line': line,
'line_number': line_num
}
if node_type in ['1', '2']: # Folders
cloud_folders.append(entry_info)
elif node_type in ['3', '4']: # Documents
cloud_documents.append(entry_info)
analysis_results['cloud_state'] = {
'root_hash': root_data['hash'],
'generation': root_data['generation'],
'total_entries': len(cloud_lines) - 1,
'folders': cloud_folders,
'documents': cloud_documents
}
print(f"\n๐ CLOUD INVENTORY:")
print(f" ๐ Folders: {len(cloud_folders)}")
print(f" ๐ Documents: {len(cloud_documents)}")
# STEP 2: Fresh Replica Build with Detailed Logging
print(f"\n๐๏ธ STEP 2: FRESH REPLICA BUILD WITH DETAILED LOGGING")
print("-" * 40)
from local_replica_v2 import RemarkableReplicaBuilder
replica_builder = RemarkableReplicaBuilder(self.session)
# Force a complete rebuild
print("๐ Forcing complete replica rebuild...")
replica_builder.build_complete_replica()
# STEP 3: Compare Replica Database vs Cloud State
print(f"\n๐ STEP 3: REPLICA DATABASE ANALYSIS")
print("-" * 40)
# Load the replica database
database_path = self.base_dir / "remarkable_replica_v2" / "replica_database.json"
if database_path.exists():
with open(database_path, 'r') as f:
replica_db = json.load(f)
replica_nodes = replica_db.get('nodes', {})
replica_documents = []
replica_folders = []
for uuid, node in replica_nodes.items():
if node.get('node_type') == 'document':
replica_documents.append({
'uuid': uuid,
'name': node.get('name', 'Unknown'),
'hash': node.get('hash', 'Unknown'),
'parent': node.get('metadata', {}).get('parent', '')
})
elif node.get('node_type') == 'folder':
replica_folders.append({
'uuid': uuid,
'name': node.get('name', 'Unknown'),
'hash': node.get('hash', 'Unknown')
})
analysis_results['replica_state'] = {
'total_nodes': len(replica_nodes),
'folders': replica_folders,
'documents': replica_documents
}
print(f"๐ REPLICA DATABASE CONTENT:")
print(f" ๐ Folders: {len(replica_folders)}")
print(f" ๐ Documents: {len(replica_documents)}")
print(f"\n๐ REPLICA FOLDERS:")
for i, folder in enumerate(replica_folders, 1):
print(f" {i}. {folder['name']} (UUID: {folder['uuid'][:8]}...)")
print(f"\n๐ REPLICA DOCUMENTS:")
for i, doc in enumerate(replica_documents, 1):
parent_info = f" [in folder]" if doc['parent'] else " [root]"
print(f" {i}. {doc['name']} (UUID: {doc['uuid'][:8]}...){parent_info}")
else:
print("โ Replica database not found!")
analysis_results['sync_issues'].append("Replica database file missing")
# STEP 4: Cross-Reference Analysis
print(f"\n๐ STEP 4: CROSS-REFERENCE ANALYSIS")
print("-" * 40)
# Check if cloud documents are in replica
missing_from_replica = []
for cloud_doc in cloud_documents:
found_in_replica = any(r['uuid'] == cloud_doc['uuid'] for r in replica_documents)
if not found_in_replica:
missing_from_replica.append(cloud_doc)
print(f"โ MISSING FROM REPLICA: {cloud_doc['uuid'][:8]}... (type {cloud_doc['type']}, size {cloud_doc['size']})")
# Check if replica documents are in cloud
missing_from_cloud = []
for replica_doc in replica_documents:
found_in_cloud = any(c['uuid'] == replica_doc['uuid'] for c in cloud_documents)
if not found_in_cloud:
missing_from_cloud.append(replica_doc)
print(f"โ MISSING FROM CLOUD: {replica_doc['name']} (UUID: {replica_doc['uuid'][:8]}...)")
analysis_results['missing_documents'] = {
'missing_from_replica': missing_from_replica,
'missing_from_cloud': missing_from_cloud
}
# STEP 5: Test Document Analysis
print(f"\n๐ฏ STEP 5: TEST DOCUMENT ANALYSIS")
print("-" * 40)
test_uuids = [
'824225cd-3f6f-4d00-bbbb-54d53ab94cc5', # Our recent upload
'2342f4af-3034-45d2-be90-e17ecc9e04d5', # Invoice PDF
'7b3f2f2b-6757-4673-9aa4-636a895415f5' # Pylontech PDF (should be missing)
]
for test_uuid in test_uuids:
print(f"\n๐ Testing document {test_uuid[:8]}...")
# Check in cloud
in_cloud = any(test_uuid in line for line in cloud_lines)
print(f" Cloud: {'โ
FOUND' if in_cloud else 'โ NOT FOUND'}")
# Check in replica
in_replica = test_uuid in replica_nodes if 'replica_nodes' in locals() else False
print(f" Replica: {'โ
FOUND' if in_replica else 'โ NOT FOUND'}")
# If in cloud, fetch and validate components
if in_cloud:
cloud_line = next(line for line in cloud_lines if test_uuid in line)
parts = cloud_line.split(':')
doc_hash = parts[0]
try:
doc_response = self.session.get(f"https://eu.tectonic.remarkable.com/sync/v3/files/{doc_hash}")
doc_response.raise_for_status()
print(f" DocSchema: โ
ACCESSIBLE ({len(doc_response.text)} bytes)")
except Exception as e:
print(f" DocSchema: โ NOT ACCESSIBLE - {e}")
print(f"\n๐ ANALYSIS SUMMARY:")
print(f" Cloud entries: {len(cloud_documents)} docs, {len(cloud_folders)} folders")
print(f" Replica entries: {len(replica_documents)} docs, {len(replica_folders)} folders")
print(f" Missing from replica: {len(missing_from_replica)}")
print(f" Missing from cloud: {len(missing_from_cloud)}")
return analysis_results
except Exception as e:
print(f"โ Analysis failed: {e}")
analysis_results['error'] = str(e)
return analysis_results
def create_demo_upload_scenario(self):
"""Create a comprehensive demo scenario"""
print("\n๐ฌ Creating Demo Upload Scenario")
print("This will demonstrate a complete workflow:")
print("1. Create a new notebook")
print("2. Upload a PDF document")
print("3. Organize them in folders")
print("4. Edit metadata")
print("5. Verify everything synced correctly")
# Create demo notebook
demo_notebook = f"Demo_Notebook_{int(time.time())}"
self.uploader.create_notebook(demo_notebook, template="Lines Medium")
# Create and upload demo PDF
demo_pdf = self.test_create_test_pdf()
demo_pdf_name = f"Demo_PDF_{int(time.time())}"
self.uploader.upload_pdf_document(str(demo_pdf), demo_pdf_name)
print("โ
Demo scenario created successfully!")
print("Check your reMarkable device to see the new files.")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, enable_raw_logging)
Purpose: Internal method: init
Parameters:
enable_raw_logging: Type: bool
Returns: None
save_raw_logs(self) -> Path
Purpose: Save captured raw HTTP logs to file for comparison with real app logs
Returns: Returns Path
log_test(self, test_name, success, details)
Purpose: Log test result
Parameters:
test_name: Type: strsuccess: Type: booldetails: Type: str
Returns: None
test_edit_document_name(self) -> bool
Purpose: Test 1: Edit existing document name
Returns: Returns bool
test_create_test_pdf(self) -> Path
Purpose: Create a test PDF for upload testing
Returns: Returns Path
test_upload_new_pdf(self, parent_uuid) -> bool
Purpose: Test: Upload new PDF document to specified folder (or root if no parent_uuid)
Parameters:
parent_uuid: Type: str
Returns: Returns bool
test_create_new_notebook(self) -> bool
Purpose: Test 3: Create new notebook
Returns: Returns bool
test_move_document(self) -> bool
Purpose: Test 4: Move document to different folder
Returns: Returns bool
test_hash_consistency(self) -> bool
Purpose: Test 5: Verify hash consistency after uploads
Returns: Returns bool
validate_uploaded_document(self, document_uuid) -> bool
Purpose: Validate that an uploaded document has all required components accessible
Parameters:
document_uuid: Type: str
Returns: Returns bool
analyze_root_docschema_sizes(self) -> bool
Purpose: Analyze the sizes in root.docSchema to identify patterns and issues
Returns: Returns bool
test_document_download_chain(self, document_uuid) -> bool
Purpose: Follow the complete download chain for a document to identify where it fails
Parameters:
document_uuid: Type: str
Returns: Returns bool
test_existing_document_chain(self) -> Dict[str, Any]
Purpose: Test the complete download chain for an existing uploaded document - NO NEW UPLOADS
Returns: Returns Dict[str, Any]
comprehensive_cloud_analysis(self) -> Dict[str, Any]
Purpose: Run comprehensive analysis of cloud state and replica sync process
Returns: Returns Dict[str, Any]
create_demo_upload_scenario(self)
Purpose: Create a comprehensive demo scenario
Returns: None
Required Imports
import os
import json
import time
from pathlib import Path
from typing import Dict
Usage Example
# Example usage:
# result = RemarkableUploadTests(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class SimplePDFUploadTest 75.3% similar
-
class FixedUploadTest 73.7% similar
-
function main_v100 72.0% similar
-
function main_v6 71.2% similar
-
class RemarkableUploadManager_v1 70.8% similar