class RemarkableRestFileWatcher
A file watcher class that monitors a specific folder on a reMarkable tablet using the REST API, polling for new files at regular intervals and triggering callbacks when new files are detected.
/tf/active/vicechatdev/e-ink-llm/remarkable_rest_client.py
778 - 872
moderate
Purpose
This class provides automated monitoring of a reMarkable tablet folder through REST API polling. It tracks which files have been processed, detects new files since the last check, downloads them to temporary storage, and executes user-defined callbacks for processing. The watcher runs continuously in an async loop, making it suitable for building automated workflows that respond to new documents added to a reMarkable tablet folder.
Source Code
class RemarkableRestFileWatcher:
"""File watcher using the REST API client"""
def __init__(self, rest_client: RemarkableRestClient,
watch_folder_name: str, poll_interval: int = 60):
self.rest_client = rest_client
self.watch_folder_name = watch_folder_name
self.poll_interval = poll_interval
self.logger = logging.getLogger(__name__)
self.processed_files: Set[str] = set()
self.last_check_time = datetime.now()
async def get_new_files(self) -> List[Dict]:
"""Get list of new files since last check"""
try:
# Find the watch folder
folder_id = self.rest_client.find_folder_by_name(self.watch_folder_name)
if folder_id is None:
self.logger.warning(f"Watch folder '{self.watch_folder_name}' not found")
return []
# Get documents in the folder
all_files = self.rest_client.get_documents_in_folder(folder_id)
# Filter for new files
new_files = []
for doc in all_files:
doc_id = doc.get("ID")
if doc_id and doc_id not in self.processed_files:
new_files.append(doc)
self.processed_files.add(doc_id)
return new_files
except Exception as e:
self.logger.error(f"Error checking for new files: {e}")
return []
async def start_watching(self, callback):
"""
Start watching for new files
Args:
callback: Async function to call with new files
"""
print(f"šļø Started watching reMarkable folder: {self.watch_folder_name}")
print(f"š Checking every {self.poll_interval} seconds...")
# Get initial file list to mark as processed
initial_files = await self.get_new_files()
print(f"š Tracking {len(self.processed_files)} existing files")
try:
while True:
try:
new_files = await self.get_new_files()
if new_files:
print(f"š„ Found {len(new_files)} new file(s)")
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
for doc in new_files:
doc_id = doc.get("ID")
doc_name = doc.get("VissibleName", "Unknown")
print(f"š Processing: {doc_name}")
# Download file
local_file = self.rest_client.download_document(
doc_id, doc_name, temp_path
)
if local_file:
try:
await callback(doc, local_file)
except Exception as e:
self.logger.error(f"Error in callback for {doc_name}: {e}")
print(f"ā Error processing {doc_name}: {e}")
# Wait before next check
await asyncio.sleep(self.poll_interval)
except Exception as e:
self.logger.error(f"Error in watch loop: {e}")
print(f"ā Watch error: {e}")
await asyncio.sleep(self.poll_interval)
except KeyboardInterrupt:
print(f"\nš Stopping reMarkable file watcher...")
except Exception as e:
self.logger.error(f"Fatal error in file watcher: {e}")
print(f"ā Fatal watcher error: {e}")
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
rest_client: An instance of RemarkableRestClient that provides the REST API interface for communicating with the reMarkable tablet. This client must be properly authenticated and configured before being passed to the watcher.
watch_folder_name: The name of the folder on the reMarkable tablet to monitor for new files. This is a human-readable folder name (e.g., 'Inbox'), not a folder ID.
poll_interval: The number of seconds to wait between checks for new files. Defaults to 60 seconds. Lower values provide faster detection but increase API calls and resource usage.
Return Value
The constructor returns an instance of RemarkableRestFileWatcher. The get_new_files() method returns a List[Dict] containing document metadata dictionaries for newly detected files, with each dict containing keys like 'ID' and 'VissibleName'. The start_watching() method does not return a value but runs indefinitely until interrupted.
Class Interface
Methods
__init__(self, rest_client: RemarkableRestClient, watch_folder_name: str, poll_interval: int = 60)
Purpose: Initialize the file watcher with REST client, target folder name, and polling interval
Parameters:
rest_client: Authenticated RemarkableRestClient instance for API communicationwatch_folder_name: Name of the reMarkable folder to monitorpoll_interval: Seconds between polling checks (default: 60)
Returns: None - constructor initializes the instance
async get_new_files(self) -> List[Dict]
Purpose: Check for and return new files in the watched folder that haven't been processed yet
Returns: List of document metadata dictionaries for newly detected files. Each dict contains 'ID', 'VissibleName', and other document properties. Returns empty list if no new files or on error.
async start_watching(self, callback)
Purpose: Start the continuous monitoring loop that checks for new files and executes the callback for each new file detected
Parameters:
callback: Async function that takes two arguments: doc (Dict) containing document metadata and local_file (Path) pointing to the downloaded file. This function is called for each new file detected.
Returns: None - runs indefinitely until KeyboardInterrupt or fatal error. Does not return normally.
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
rest_client |
RemarkableRestClient | The REST API client instance used to communicate with the reMarkable tablet | instance |
watch_folder_name |
str | The name of the folder being monitored on the reMarkable tablet | instance |
poll_interval |
int | Number of seconds to wait between polling checks for new files | instance |
logger |
logging.Logger | Logger instance for recording errors and warnings during operation | instance |
processed_files |
Set[str] | Set of document IDs that have already been processed, used to track state and avoid reprocessing files | instance |
last_check_time |
datetime | Timestamp of the last check for new files, initialized to current time at instantiation | instance |
Dependencies
asynciologgingtempfiledatetimepathlibtyping
Required Imports
import asyncio
import logging
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Set
Usage Example
import asyncio
from pathlib import Path
from typing import Dict
# Assume rest_client is already created and authenticated
rest_client = RemarkableRestClient(device_token='your_token')
# Create the watcher
watcher = RemarkableRestFileWatcher(
rest_client=rest_client,
watch_folder_name='Inbox',
poll_interval=30
)
# Define callback for processing new files
async def process_new_file(doc: Dict, local_file: Path):
doc_name = doc.get('VissibleName', 'Unknown')
print(f'Processing {doc_name} at {local_file}')
# Your processing logic here
with open(local_file, 'rb') as f:
content = f.read()
# Do something with the file content
# Start watching (runs indefinitely)
async def main():
await watcher.start_watching(process_new_file)
if __name__ == '__main__':
asyncio.run(main())
Best Practices
- Always provide an authenticated RemarkableRestClient instance before creating the watcher
- Ensure the callback function is async and handles exceptions internally to prevent the watch loop from crashing
- The watcher maintains state in processed_files set - creating a new instance will reset tracking and reprocess all files
- Files are downloaded to temporary directories that are automatically cleaned up after callback execution
- Use appropriate poll_interval values to balance responsiveness with API rate limits and resource usage
- The watcher runs indefinitely until KeyboardInterrupt or fatal error - plan for graceful shutdown in production
- Initial call to get_new_files() marks all existing files as processed to avoid reprocessing on startup
- Callback receives both document metadata (dict) and local file path (Path) for flexible processing
- The watcher continues running even if individual file processing fails, logging errors but not stopping
- Consider implementing retry logic in your callback for transient failures
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class RemarkableFileWatcher 85.5% similar
-
class RemarkableCloudWatcher 70.3% similar
-
function main_v21 69.7% similar
-
class RemarkableAPIClient 59.7% similar
-
function test_remarkable_discovery 59.2% similar