class RemarkableCloudWatcher
Monitors the reMarkable Cloud 'gpt_out' folder for new documents, automatically downloads them, and converts .rm (reMarkable native) files to PDF format.
/tf/active/vicechatdev/e-ink-llm/mixed_cloud_processor.py
36 - 473
complex
Purpose
This class provides a complete solution for watching a specific folder in reMarkable Cloud storage, detecting new documents, and extracting them as PDFs. It handles both native PDF files and reMarkable's proprietary .rm format, converting the latter using the 'rmc' command-line tool. The class maintains state to avoid reprocessing files, manages temporary storage, and supports multi-page notebook conversion with PDF concatenation.
Source Code
class RemarkableCloudWatcher:
"""Watches reMarkable Cloud gpt_out folder for new files"""
def __init__(self, remarkable_session, logger):
self.session = remarkable_session
self.logger = logger
self.base_url = "https://eu.tectonic.remarkable.com"
self.processed_files = set() # Track processed file hashes
self.gpt_out_folder_uuid = None
self.temp_dir = Path(tempfile.mkdtemp(prefix="remarkable_watch_"))
def __del__(self):
"""Cleanup temporary directory"""
if hasattr(self, 'temp_dir') and self.temp_dir.exists():
shutil.rmtree(self.temp_dir, ignore_errors=True)
async def initialize(self):
"""Initialize the watcher by finding the gpt_out folder"""
self.logger.info("🔍 Initializing reMarkable Cloud watcher...")
try:
# Discover all folders to find gpt_out
all_nodes = await self._discover_all_nodes()
# Find gpt_out folder
for uuid, node in all_nodes.items():
if (node.get('node_type') == 'folder' and
node.get('name', '').lower() in ['gpt_out', 'gpt out', 'gptout']):
self.gpt_out_folder_uuid = uuid
self.logger.info(f"✅ Found gpt_out folder: {uuid}")
break
if not self.gpt_out_folder_uuid:
self.logger.warning("⚠️ gpt_out folder not found in reMarkable Cloud")
return False
return True
except Exception as e:
self.logger.error(f"❌ Failed to initialize reMarkable watcher: {e}")
return False
async def _discover_all_nodes(self) -> Dict[str, Dict]:
"""Discover all nodes in reMarkable Cloud using local_replica_v2.py method"""
all_nodes = {}
try:
# Get root hash
root_response = self.session.get(f"{self.base_url}/sync/v4/root")
root_response.raise_for_status()
root_data = root_response.json()
root_hash = root_data.get('hash')
if not root_hash:
return all_nodes
# Discover nodes recursively
discovered_hashes = set()
hashes_to_process = [root_hash]
while hashes_to_process:
current_hash = hashes_to_process.pop(0)
if current_hash in discovered_hashes:
continue
discovered_hashes.add(current_hash)
# Fetch and parse content
content_info = await self._fetch_hash_content(current_hash)
if not content_info:
continue
parsed = self._parse_directory_listing(content_info['content'])
# Extract metadata if available
metadata = {}
node_name = f"unknown_{current_hash[:8]}"
node_type = "folder"
parent_uuid = None
for component in parsed['data_components']:
if component['component_type'] == 'metadata':
extracted_metadata = await self._extract_metadata(component['hash'])
if extracted_metadata:
metadata = extracted_metadata
node_name = metadata.get('visibleName', node_name)
if metadata.get('type') == 'DocumentType':
node_type = "document"
elif metadata.get('type') == 'CollectionType':
node_type = "folder"
parent_uuid = metadata.get('parent', '') or None
break
# Determine node UUID
node_uuid = None
for component in parsed['child_objects']:
node_uuid = component['uuid_component']
break
if not node_uuid and parsed['data_components']:
component_name = parsed['data_components'][0]['uuid_component']
if '.' in component_name:
node_uuid = component_name.split('.')[0]
if not node_uuid:
node_uuid = current_hash[:32]
# Store node
all_nodes[node_uuid] = {
'uuid': node_uuid,
'hash': current_hash,
'name': node_name,
'node_type': node_type,
'parent_uuid': parent_uuid,
'metadata': metadata,
'parsed_data': parsed
}
# Add child hashes to process
for child_obj in parsed['child_objects']:
if child_obj['hash'] not in discovered_hashes:
hashes_to_process.append(child_obj['hash'])
return all_nodes
except Exception as e:
self.logger.error(f"❌ Failed to discover nodes: {e}")
return all_nodes
async def _fetch_hash_content(self, hash_ref: str) -> Optional[Dict[str, Any]]:
"""Fetch content from reMarkable cloud by hash"""
try:
url = f"{self.base_url}/sync/v3/files/{hash_ref}"
response = self.session.get(url)
response.raise_for_status()
return {
'hash': hash_ref,
'content': response.content,
'size': len(response.content)
}
except Exception as e:
self.logger.debug(f"Failed to fetch {hash_ref[:16]}...: {e}")
return None
def _parse_directory_listing(self, content: bytes) -> Dict[str, Any]:
"""Parse directory listing using local_replica_v2.py method"""
try:
text_content = content.decode('utf-8')
except UnicodeDecodeError:
return {'child_objects': [], 'data_components': []}
result = {
'child_objects': [],
'data_components': []
}
lines = text_content.split('\n')
if lines and lines[0].strip().isdigit():
lines = lines[1:] # Skip count line
entry_pattern = r'^([a-f0-9]{64}):([0-9a-fA-F]+):([a-f0-9-/]+(?:\.[^:]+)?):(\d+):(\d+)$'
for line in lines:
line = line.strip()
if not line:
continue
match = re.match(entry_pattern, line, re.IGNORECASE)
if match:
hash_val, flags, uuid_component, type_val, size_val = match.groups()
entry_info = {
'hash': hash_val,
'flags': flags,
'uuid_component': uuid_component,
'type': type_val,
'size': int(size_val)
}
if '.' in uuid_component:
# Data component (.content, .metadata, .pdf, .rm, etc.)
component_type = uuid_component.split('.')[-1]
if '/' in component_type: # Handle .rm files like "uuid/filename.rm"
component_type = component_type.split('/')[-1]
entry_info['component_type'] = component_type
result['data_components'].append(entry_info)
else:
# Child object (pure UUID)
result['child_objects'].append(entry_info)
return result
async def _extract_metadata(self, metadata_hash: str) -> Optional[Dict[str, Any]]:
"""Extract metadata from hash"""
content_info = await self._fetch_hash_content(metadata_hash)
if not content_info:
return None
try:
text_content = content_info['content'].decode('utf-8')
return json.loads(text_content)
except (UnicodeDecodeError, json.JSONDecodeError) as e:
self.logger.debug(f"Failed to parse metadata {metadata_hash[:16]}...: {e}")
return None
async def check_for_new_files(self) -> List[Path]:
"""Check gpt_out folder for new files and convert them to PDFs"""
if not self.gpt_out_folder_uuid:
return []
new_pdf_files = []
try:
# Re-discover nodes to get current state
all_nodes = await self._discover_all_nodes()
# Find documents in gpt_out folder
gpt_out_documents = []
for uuid, node in all_nodes.items():
if (node.get('node_type') == 'document' and
node.get('parent_uuid') == self.gpt_out_folder_uuid):
gpt_out_documents.append(node)
self.logger.debug(f"Found {len(gpt_out_documents)} documents in gpt_out folder")
for doc_node in gpt_out_documents:
doc_hash = doc_node['hash']
# Skip if already processed
if doc_hash in self.processed_files:
continue
self.logger.info(f"📄 Processing new document: {doc_node['name']}")
# Extract the document
pdf_file = await self._extract_document(doc_node, all_nodes)
if pdf_file:
new_pdf_files.append(pdf_file)
self.processed_files.add(doc_hash)
except Exception as e:
self.logger.error(f"❌ Error checking for new files: {e}")
return new_pdf_files
async def _extract_document(self, doc_node: Dict, all_nodes: Dict) -> Optional[Path]:
"""Extract a document from reMarkable Cloud, converting .rm files to PDF if needed"""
try:
parsed_data = doc_node.get('parsed_data', {})
doc_name = doc_node.get('name', 'unknown')
# Create document-specific temp directory
doc_temp_dir = self.temp_dir / f"doc_{doc_node['uuid'][:8]}"
doc_temp_dir.mkdir(exist_ok=True)
# Check for PDF content first
pdf_hash = None
rm_hashes = []
for component in parsed_data.get('data_components', []):
if component['component_type'] == 'pdf':
pdf_hash = component['hash']
elif component['component_type'] == 'rm':
rm_hashes.append(component['hash'])
# If PDF exists, extract it directly
if pdf_hash:
self.logger.info(f"📄 Extracting PDF: {doc_name}")
pdf_content = await self._fetch_hash_content(pdf_hash)
if pdf_content:
pdf_path = doc_temp_dir / f"{doc_name}.pdf"
with open(pdf_path, 'wb') as f:
f.write(pdf_content['content'])
return pdf_path
# If .rm files exist, convert to PDF
elif rm_hashes:
self.logger.info(f"🖊️ Converting .rm files to PDF: {doc_name}")
return await self._convert_rm_to_pdf(doc_name, rm_hashes, doc_temp_dir)
else:
self.logger.warning(f"⚠️ No PDF or .rm content found for: {doc_name}")
return None
except Exception as e:
self.logger.error(f"❌ Error extracting document {doc_name}: {e}")
return None
async def _convert_rm_to_pdf(self, doc_name: str, rm_hashes: List[str], output_dir: Path) -> Optional[Path]:
"""Convert .rm files to PDF using rmc tool (from local_replica_v2.py)"""
try:
# Create notebook directory for .rm files
notebook_dir = output_dir / "notebook"
notebook_dir.mkdir(exist_ok=True)
# Download .rm files
rm_files = []
for i, rm_hash in enumerate(rm_hashes):
rm_content = await self._fetch_hash_content(rm_hash)
if rm_content:
rm_path = notebook_dir / f"page_{i+1}.rm"
with open(rm_path, 'wb') as f:
f.write(rm_content['content'])
rm_files.append(rm_path)
if not rm_files:
self.logger.warning(f"⚠️ No .rm files downloaded for {doc_name}")
return None
# Sort files by page number
rm_files.sort(key=lambda x: int(x.stem.split('_')[1]))
# Final PDF path
final_pdf_path = output_dir / f"{doc_name}.pdf"
if len(rm_files) == 1:
# Single page - convert directly
result = subprocess.run([
"rmc", str(rm_files[0]), "-o", str(final_pdf_path)
], capture_output=True, text=True, timeout=60)
if result.returncode == 0 and final_pdf_path.exists() and final_pdf_path.stat().st_size > 0:
self.logger.info(f"✅ Converted single page to PDF: {final_pdf_path}")
return final_pdf_path
else:
self.logger.error(f"❌ rmc conversion failed: {result.stderr}")
return None
else:
# Multiple pages - convert each to temporary PDF and concatenate
temp_pdfs = []
for i, rm_file in enumerate(rm_files):
temp_pdf = notebook_dir / f"temp_page_{i+1}.pdf"
result = subprocess.run([
"rmc", str(rm_file), "-o", str(temp_pdf)
], capture_output=True, text=True, timeout=60)
if result.returncode == 0 and temp_pdf.exists() and temp_pdf.stat().st_size > 0:
temp_pdfs.append(temp_pdf)
else:
self.logger.error(f"❌ rmc conversion failed for page {i+1}: {result.stderr}")
return None
if temp_pdfs:
# Concatenate PDFs using PyPDF2 or similar
success = await self._concatenate_pdfs(temp_pdfs, final_pdf_path)
if success:
self.logger.info(f"✅ Converted multi-page notebook to PDF: {final_pdf_path}")
return final_pdf_path
return None
except Exception as e:
self.logger.error(f"❌ Error converting .rm files: {e}")
return None
async def _concatenate_pdfs(self, pdf_files: List[Path], output_path: Path) -> bool:
"""Concatenate multiple PDF files into one"""
if len(pdf_files) <= 1:
# If only one file, just copy it
if pdf_files:
shutil.copy2(pdf_files[0], output_path)
return True
return False
try:
# Try PyPDF2 first (newest and most stable)
from PyPDF2 import PdfWriter, PdfReader
writer = PdfWriter()
for pdf_file in pdf_files:
reader = PdfReader(str(pdf_file))
for page in reader.pages:
writer.add_page(page)
with open(output_path, 'wb') as output_file:
writer.write(output_file)
self.logger.info(f"✅ PDF concatenation successful using PyPDF2")
return True
except ImportError:
try:
# Try PyPDF4 as fallback
from PyPDF4 import PdfFileWriter, PdfFileReader
writer = PdfFileWriter()
for pdf_file in pdf_files:
reader = PdfFileReader(str(pdf_file))
for page_num in range(reader.getNumPages()):
page = reader.getPage(page_num)
writer.addPage(page)
with open(output_path, 'wb') as output_file:
writer.write(output_file)
self.logger.info(f"✅ PDF concatenation successful using PyPDF4")
return True
except ImportError:
# Fallback to using system commands if no PDF library available
self.logger.warning("⚠️ No PDF library available (PyPDF2/PyPDF4), trying system commands")
self.logger.warning("💡 Install PyPDF2 for better performance: pip install PyPDF2")
try:
# Try using pdftk if available
cmd = ["pdftk"] + [str(f) for f in pdf_files] + ["cat", "output", str(output_path)]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0:
self.logger.info(f"✅ PDF concatenation successful using pdftk")
return True
# Try using gs (ghostscript) as fallback
cmd = ["gs", "-dNOPAUSE", "-dBATCH", "-sDEVICE=pdfwrite", f"-sOutputFile={output_path}"] + [str(f) for f in pdf_files]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0:
self.logger.info(f"✅ PDF concatenation successful using ghostscript")
return True
else:
self.logger.error("❌ All PDF concatenation methods failed")
self.logger.error("💡 Install dependencies: pip install PyPDF2 OR sudo apt-get install pdftk ghostscript")
return False
except Exception as e:
self.logger.error(f"❌ PDF concatenation failed: {e}")
self.logger.error("💡 Install dependencies: pip install PyPDF2 OR sudo apt-get install pdftk ghostscript")
return False
except Exception as e:
self.logger.error(f"❌ PDF concatenation error: {e}")
return False
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
remarkable_session: An authenticated requests.Session object configured with reMarkable Cloud authentication headers and tokens. This session is used for all API calls to the reMarkable Cloud service.
logger: A logging.Logger instance for outputting status messages, errors, and debug information throughout the watching and extraction process.
Return Value
Instantiation returns a RemarkableCloudWatcher object. The main method check_for_new_files() returns a List[Path] containing paths to newly extracted PDF files. The initialize() method returns a boolean indicating success/failure of finding the gpt_out folder.
Class Interface
Methods
__init__(self, remarkable_session, logger)
Purpose: Initialize the watcher with authentication session and logger, create temporary directory for file processing
Parameters:
remarkable_session: Authenticated requests.Session for reMarkable Cloud APIlogger: Logger instance for status and error messages
Returns: None (constructor)
__del__(self)
Purpose: Cleanup temporary directory when object is destroyed
Returns: None
async initialize(self) -> bool
Purpose: Find and store the UUID of the gpt_out folder in reMarkable Cloud by discovering all nodes
Returns: Boolean indicating whether gpt_out folder was successfully found
async _discover_all_nodes(self) -> Dict[str, Dict]
Purpose: Recursively discover all nodes (folders and documents) in reMarkable Cloud storage
Returns: Dictionary mapping UUIDs to node information (name, type, parent, metadata, hash)
async _fetch_hash_content(self, hash_ref: str) -> Optional[Dict[str, Any]]
Purpose: Download content from reMarkable Cloud using a content hash reference
Parameters:
hash_ref: 64-character hash identifying the content to fetch
Returns: Dictionary with 'hash', 'content' (bytes), and 'size' keys, or None on failure
_parse_directory_listing(self, content: bytes) -> Dict[str, Any]
Purpose: Parse reMarkable's directory listing format to extract child objects and data components
Parameters:
content: Raw bytes of directory listing content
Returns: Dictionary with 'child_objects' (folders/documents) and 'data_components' (files like .pdf, .rm, .metadata)
async _extract_metadata(self, metadata_hash: str) -> Optional[Dict[str, Any]]
Purpose: Fetch and parse JSON metadata for a document or folder
Parameters:
metadata_hash: Hash reference to the metadata file
Returns: Parsed JSON metadata dictionary containing visibleName, type, parent, etc., or None on failure
async check_for_new_files(self) -> List[Path]
Purpose: Main method to check gpt_out folder for new documents and extract them as PDFs
Returns: List of Path objects pointing to newly extracted PDF files in temporary directory
async _extract_document(self, doc_node: Dict, all_nodes: Dict) -> Optional[Path]
Purpose: Extract a single document from reMarkable Cloud, handling both PDF and .rm formats
Parameters:
doc_node: Dictionary containing document metadata and parsed dataall_nodes: Complete node tree for reference
Returns: Path to extracted PDF file, or None on failure
async _convert_rm_to_pdf(self, doc_name: str, rm_hashes: List[str], output_dir: Path) -> Optional[Path]
Purpose: Convert reMarkable .rm files to PDF using the rmc tool, handling single and multi-page documents
Parameters:
doc_name: Name of the document for output filenamerm_hashes: List of hash references to .rm page filesoutput_dir: Directory to store output PDF and temporary files
Returns: Path to final PDF file, or None on failure
async _concatenate_pdfs(self, pdf_files: List[Path], output_path: Path) -> bool
Purpose: Merge multiple PDF files into a single PDF, trying PyPDF2, PyPDF4, pdftk, and ghostscript in order
Parameters:
pdf_files: List of Path objects to PDF files to concatenateoutput_path: Path where merged PDF should be written
Returns: Boolean indicating success or failure of concatenation
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
session |
requests.Session | Authenticated session for making API requests to reMarkable Cloud | instance |
logger |
logging.Logger | Logger instance for outputting messages | instance |
base_url |
str | Base URL for reMarkable Cloud API (https://eu.tectonic.remarkable.com) | instance |
processed_files |
Set[str] | Set of document hashes that have already been processed to avoid reprocessing | instance |
gpt_out_folder_uuid |
Optional[str] | UUID of the gpt_out folder in reMarkable Cloud, set during initialization | instance |
temp_dir |
Path | Temporary directory for storing downloaded and converted files, automatically cleaned up | instance |
Dependencies
asynciojsonresubprocesstempfileshutilpathlibtypingdatetimeloggingrequestsPyPDF2PyPDF4
Required Imports
import asyncio
import json
import re
import subprocess
import tempfile
import shutil
from pathlib import Path
from typing import Dict, List, Optional, Any
import logging
import requests
Conditional/Optional Imports
These imports are only needed under specific conditions:
from PyPDF2 import PdfWriter, PdfReader
Condition: Required for PDF concatenation when converting multi-page .rm notebooks. Falls back to PyPDF4 if not available.
Optionalfrom PyPDF4 import PdfFileWriter, PdfFileReader
Condition: Fallback for PDF concatenation if PyPDF2 is not available. Can also fall back to system commands (pdftk, ghostscript).
OptionalUsage Example
import asyncio
import logging
import requests
from remarkable_cloud_watcher import RemarkableCloudWatcher
# Setup logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Create authenticated session (example - actual auth is more complex)
session = requests.Session()
session.headers.update({
'Authorization': 'Bearer YOUR_TOKEN',
'User-Agent': 'remarkable-cloud-watcher'
})
# Create watcher instance
watcher = RemarkableCloudWatcher(session, logger)
# Initialize and watch for files
async def main():
# Initialize - finds gpt_out folder
if await watcher.initialize():
# Check for new files (returns list of PDF paths)
new_pdfs = await watcher.check_for_new_files()
for pdf_path in new_pdfs:
print(f'New PDF: {pdf_path}')
# Process the PDF file...
else:
print('Failed to initialize watcher')
# Run the async function
asyncio.run(main())
Best Practices
- Always call initialize() before check_for_new_files() to ensure the gpt_out folder is located
- The class automatically cleans up temporary files in __del__, but for long-running processes, consider periodic cleanup
- The processed_files set grows indefinitely - for long-running watchers, implement periodic clearing or size limits
- Ensure the 'rmc' tool is installed for .rm file conversion (remarkable-cli package)
- Install PyPDF2 for reliable PDF concatenation: pip install PyPDF2
- The class uses async methods - must be called with await in an async context
- Handle the case where initialize() returns False (gpt_out folder not found)
- The session object must remain valid throughout the watcher's lifetime
- Temporary files are stored in system temp directory - ensure sufficient disk space
- For production use, implement retry logic for network failures in API calls
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class RemarkableFileWatcher 78.4% similar
-
class RemarkableCloudManager 71.1% similar
-
class RemarkableRestFileWatcher 70.3% similar
-
class RemarkableEInkProcessor 70.1% similar
-
function test_remarkable_discovery 68.8% similar