class SharePointFileCloudSync
Orchestrates synchronization of documents from SharePoint to FileCloud, managing the complete sync lifecycle including document retrieval, comparison, upload, and folder structure creation.
/tf/active/vicechatdev/SPFCsync/sync_service.py
10 - 410
complex
Purpose
This class serves as the main coordinator for syncing documents between SharePoint and FileCloud. It initializes and manages client connections to both services, retrieves documents from SharePoint, compares modification dates, downloads content, uploads to FileCloud, and tracks detailed statistics. It supports both single-run and continuous synchronization modes, handles rate limiting and retries, creates empty folder structures, and provides comprehensive logging of all operations.
Source Code
class SharePointFileCloudSync:
"""
Main synchronization class for SharePoint to FileCloud sync.
"""
def __init__(self):
"""Initialize the sync service with configuration."""
# Validate configuration
Config.validate_config()
Config.setup_logging()
self.logger = logging.getLogger(__name__)
# Initialize clients
self.sp_client = SharePointGraphClient(
Config.SHAREPOINT_SITE_URL,
Config.AZURE_CLIENT_ID,
Config.AZURE_CLIENT_SECRET
)
self.fc_client = FileCloudClient(
Config.FILECLOUD_SERVER_URL,
Config.FILECLOUD_USERNAME,
Config.FILECLOUD_PASSWORD
)
self.logger.info("SharePoint to FileCloud sync service initialized")
def sync_documents(self, max_documents: int = None) -> Dict[str, int]:
"""
Perform a full synchronization of documents from SharePoint to FileCloud.
Args:
max_documents: Maximum number of documents to process (for debugging)
Returns:
Dictionary with sync statistics
"""
stats = {
'total_documents': 0,
'total_folders': 0,
'new_uploads': 0,
'updated_files': 0,
'skipped_files': 0,
'skipped_same_date': 0,
'updated_newer_source': 0,
'empty_folders_created': 0,
'errors': 0
}
try:
self.logger.info("Starting SharePoint to FileCloud synchronization")
# Get all documents from SharePoint
# Note: Our Graph client is already connected to the Documents drive,
# so we start from root ("/") rather than using SHAREPOINT_DOCUMENTS_PATH
if max_documents:
print(f"🔍 Limiting to {max_documents} documents for debugging")
sp_documents = []
self.sp_client._get_documents_recursive("/", sp_documents, max_files=max_documents)
else:
sp_documents = self.sp_client.get_all_documents("/")
stats['total_documents'] = len(sp_documents)
self.logger.info(f"Found {len(sp_documents)} documents in SharePoint")
# Create empty folder structure if configured
if Config.CREATE_EMPTY_FOLDERS:
stats['empty_folders_created'] = self._create_empty_folders(sp_documents)
# Process each document
for doc in sp_documents:
try:
result = self._sync_single_document(doc)
# Enhanced statistics tracking
if result == 'skipped_same_date':
stats['skipped_same_date'] += 1
elif result == 'updated_newer_source':
stats['updated_newer_source'] += 1
stats['updated_files'] += 1 # Also count in general updated
else:
stats[result] += 1
except Exception as e:
self.logger.error(f"Error syncing document {doc['name']}: {e}")
stats['errors'] += 1
# Enhanced logging with detailed statistics
self.logger.info(f"Synchronization completed. Detailed stats: {stats}")
if stats['updated_files'] > 0:
self.logger.info(f"Files updated breakdown: Same date skipped: {stats['skipped_same_date']}, Newer source: {stats['updated_newer_source']}")
return stats
except Exception as e:
self.logger.error(f"Error during synchronization: {e}")
stats['errors'] += 1
return stats
def _sync_single_document(self, doc: Dict) -> str:
"""
Sync a single document from SharePoint to FileCloud.
Args:
doc: Document information dictionary from SharePoint
Returns:
String indicating the action taken ('new_uploads', 'updated_files', 'skipped_files')
"""
# Construct FileCloud path
fc_path = self._get_filecloud_path(doc)
# Check if file exists in FileCloud
fc_file_info = self.fc_client.get_file_info(fc_path)
# Parse SharePoint modification date
sp_modified = self._parse_sharepoint_date(doc['modified'])
# Determine if we need to upload
if fc_file_info is None:
# File doesn't exist in FileCloud, upload it
action = 'new_uploads'
self.logger.info(f"New file detected: {doc['name']}")
elif self.fc_client.file_needs_update(sp_modified, fc_file_info):
# File exists but is older, update it
action = 'updated_files'
self.logger.info(f"File update detected: {doc['name']} (SharePoint newer)")
else:
# File is up to date, skip it
self.logger.debug(f"File up to date, skipping: {doc['name']}")
return 'skipped_same_date'
# Download file content from SharePoint
file_content = self._download_document_content(doc)
if file_content is None:
raise Exception(f"Failed to download file content from SharePoint: {doc['name']}")
# Upload to FileCloud
success = self.fc_client.upload_file(file_content, fc_path, sp_modified)
if not success:
raise Exception(f"Failed to upload file to FileCloud: {doc['name']}")
return action
def _create_empty_folders(self, documents: List[Dict]) -> int:
"""
Create empty folder structure in FileCloud based on SharePoint documents.
Args:
documents: List of document information from SharePoint
Returns:
Number of empty folders created
"""
folders_created = 0
unique_folders = set()
# Extract all unique folder paths from documents
for doc in documents:
folder_path = doc.get('folder_path', '/')
if folder_path and folder_path != '/':
# Build the full FileCloud path
fc_folder_path = f"{Config.FILECLOUD_BASE_PATH}/{folder_path}"
fc_folder_path = '/'.join(filter(None, fc_folder_path.split('/')))
if not fc_folder_path.startswith('/'):
fc_folder_path = '/' + fc_folder_path
unique_folders.add(fc_folder_path)
# Create each unique folder
for folder_path in sorted(unique_folders):
try:
if self.fc_client.create_folder(folder_path):
folders_created += 1
self.logger.debug(f"Created empty folder: {folder_path}")
except Exception as e:
self.logger.warning(f"Failed to create folder {folder_path}: {e}")
if folders_created > 0:
self.logger.info(f"Created {folders_created} empty folders in FileCloud")
return folders_created
def _make_download_request_with_retry(self, url, max_retries=3):
"""Make a download request with retry logic for rate limiting"""
headers = {
'Authorization': f'Bearer {self.sp_client.access_token}',
}
for attempt in range(max_retries + 1):
try:
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
return response
elif response.status_code == 429:
# Rate limited - check for Retry-After header
retry_delay = self._get_retry_delay(response, attempt)
self.logger.warning(f"Rate limited on download, waiting {retry_delay}s (attempt {attempt + 1}/{max_retries + 1})")
time.sleep(retry_delay)
continue
elif response.status_code == 401:
# Authentication error - likely expired download URL, don't retry
self.logger.debug(f"Download URL expired for request (status 401) - will fallback to Graph API")
return response
else:
self.logger.warning(f"Download request failed with status {response.status_code}")
return response
except requests.exceptions.RequestException as e:
self.logger.error(f"Request exception during download: {e}")
if attempt < max_retries:
time.sleep(2 ** attempt) # Exponential backoff
continue
else:
return None
return None
def _get_retry_delay(self, response, attempt):
"""Extract retry delay from response headers or use exponential backoff"""
import time
from email.utils import parsedate_to_datetime
retry_after = response.headers.get('Retry-After')
if retry_after:
try:
# Try parsing as seconds
return min(int(retry_after), 60)
except ValueError:
try:
# Try parsing as HTTP date
retry_date = parsedate_to_datetime(retry_after)
delay = (retry_date - datetime.now()).total_seconds()
return max(1, min(delay, 60))
except:
pass
# Fallback to exponential backoff
return min(2 ** attempt, 60)
def _download_document_content(self, doc: Dict) -> Optional[bytes]:
"""
Download document content using the document's download URL.
Args:
doc: Document information dictionary from SharePoint
Returns:
File content as bytes, or None if failed
"""
try:
# Try direct download URL first (these URLs may expire quickly)
download_url = doc.get('download_url')
if download_url and not Config.SKIP_DIRECT_DOWNLOAD:
# Use SharePoint client's retry mechanism for direct downloads
response = self._make_download_request_with_retry(download_url)
if response and response.status_code == 200:
return response.content
elif response and response.status_code == 401:
self.logger.debug(f"Direct download URL expired for {doc['name']} - using fallback method")
else:
self.logger.debug(f"Direct download failed for {doc['name']} - using fallback method")
# Fallback: Use Graph API content endpoint with retry (or primary method if direct downloads are disabled)
headers = {
'Authorization': f'Bearer {self.sp_client.access_token}',
}
graph_id = doc.get('graph_id')
if graph_id:
content_url = f"https://graph.microsoft.com/v1.0/sites/{self.sp_client.site_id}/drives/{self.sp_client.drive_id}/items/{graph_id}/content"
response = self.sp_client._make_request_with_retry('GET', content_url, headers=headers)
if response and response.status_code == 200:
return response.content
else:
self.logger.error(f"Graph API download failed for {doc['name']}")
return None
except Exception as e:
self.logger.error(f"Error downloading file content for {doc['name']}: {e}")
return None
def _get_filecloud_path(self, doc: Dict) -> str:
"""
Generate FileCloud path for a SharePoint document.
Args:
doc: Document information dictionary
Returns:
FileCloud path string
"""
# Use the relative path from SharePoint and combine with FileCloud base path
relative_path = doc.get('relative_path', doc['name'])
# Ensure path starts with base path
if relative_path:
fc_path = f"{Config.FILECLOUD_BASE_PATH}/{relative_path}"
else:
fc_path = f"{Config.FILECLOUD_BASE_PATH}/{doc['name']}"
# Normalize path (remove double slashes, etc.)
fc_path = '/'.join(filter(None, fc_path.split('/')))
if not fc_path.startswith('/'):
fc_path = '/' + fc_path
return fc_path
def _parse_sharepoint_date(self, date_str: str) -> datetime:
"""
Parse SharePoint date string to datetime object.
Args:
date_str: Date string from SharePoint
Returns:
datetime object with UTC timezone
"""
from datetime import timezone
try:
if not date_str:
# Use current time with UTC timezone
return datetime.now(timezone.utc)
# Handle year-only dates like "2024" or "2022"
if date_str.isdigit() and len(date_str) == 4:
# Use January 1st of that year, UTC timezone
return datetime(int(date_str), 1, 1, tzinfo=timezone.utc)
# SharePoint typically returns ISO format dates
if 'T' in date_str:
# Handle different timezone formats
if date_str.endswith('Z'):
date_str = date_str.replace('Z', '+00:00')
elif '+' not in date_str and '-' not in date_str[-6:]:
# No timezone info, assume UTC
date_str = date_str + '+00:00'
parsed_date = datetime.fromisoformat(date_str)
# Ensure we have UTC timezone
if parsed_date.tzinfo is None:
parsed_date = parsed_date.replace(tzinfo=timezone.utc)
return parsed_date
else:
# Fallback parsing for other formats
try:
parsed_date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')
return parsed_date.replace(tzinfo=timezone.utc)
except ValueError:
# Try date only format
return datetime.strptime(date_str, '%Y-%m-%d')
except Exception as e:
self.logger.warning(f"Error parsing date {date_str}: {e}")
return datetime.now()
def run_continuous_sync(self, max_documents: int = None):
"""
Run continuous synchronization at configured intervals.
Args:
max_documents: Optional limit on number of documents to process per cycle
"""
self.logger.info(f"Starting continuous sync with {Config.SYNC_INTERVAL_MINUTES} minute intervals")
if max_documents:
self.logger.info(f"Document limit per cycle: {max_documents}")
while True:
try:
start_time = time.time()
stats = self.sync_documents(max_documents=max_documents)
duration = time.time() - start_time
self.logger.info(f"Sync cycle completed in {duration:.2f} seconds")
# Wait for next sync interval
self.logger.info(f"Waiting {Config.SYNC_INTERVAL_MINUTES} minutes until next sync")
time.sleep(Config.SYNC_INTERVAL_MINUTES * 60)
except KeyboardInterrupt:
self.logger.info("Sync service stopped by user")
break
except Exception as e:
self.logger.error(f"Error in sync cycle: {e}")
# Wait a bit before retrying
time.sleep(60)
def run_single_sync(self, max_documents: int = None) -> Dict[str, int]:
"""
Run a single synchronization cycle.
Args:
max_documents: Maximum number of documents to process (for debugging)
Returns:
Dictionary with sync statistics
"""
return self.sync_documents(max_documents)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
__init__: No parameters required. The constructor automatically validates configuration from the Config class, sets up logging, and initializes SharePointGraphClient and FileCloudClient instances using credentials and URLs from Config.
Return Value
Instantiation returns a SharePointFileCloudSync object ready to perform synchronization operations. Key method returns: sync_documents() and run_single_sync() return a dictionary with sync statistics including 'total_documents', 'total_folders', 'new_uploads', 'updated_files', 'skipped_files', 'skipped_same_date', 'updated_newer_source', 'empty_folders_created', and 'errors'. run_continuous_sync() does not return (runs indefinitely until interrupted).
Class Interface
Methods
__init__(self)
Purpose: Initialize the sync service with configuration, validate settings, setup logging, and create SharePoint and FileCloud client instances
Returns: None - initializes instance with logger, sp_client, and fc_client attributes
sync_documents(self, max_documents: int = None) -> Dict[str, int]
Purpose: Perform a full synchronization of documents from SharePoint to FileCloud with detailed statistics tracking
Parameters:
max_documents: Optional maximum number of documents to process, useful for debugging or testing. If None, processes all documents.
Returns: Dictionary with keys: 'total_documents', 'total_folders', 'new_uploads', 'updated_files', 'skipped_files', 'skipped_same_date', 'updated_newer_source', 'empty_folders_created', 'errors'
_sync_single_document(self, doc: Dict) -> str
Purpose: Sync a single document from SharePoint to FileCloud, determining if upload is needed based on modification dates
Parameters:
doc: Document information dictionary from SharePoint containing 'name', 'modified', 'download_url', 'graph_id', 'folder_path', 'relative_path'
Returns: String indicating action taken: 'new_uploads', 'updated_files', 'skipped_same_date', or 'updated_newer_source'
_create_empty_folders(self, documents: List[Dict]) -> int
Purpose: Create empty folder structure in FileCloud based on folder paths from SharePoint documents
Parameters:
documents: List of document information dictionaries from SharePoint, each containing 'folder_path'
Returns: Integer count of empty folders successfully created
_make_download_request_with_retry(self, url, max_retries=3)
Purpose: Make a download request with retry logic for handling rate limiting (429) and transient errors
Parameters:
url: Download URL to requestmax_retries: Maximum number of retry attempts (default 3)
Returns: requests.Response object if successful, None if all retries failed
_get_retry_delay(self, response, attempt)
Purpose: Extract retry delay from response headers (Retry-After) or calculate exponential backoff
Parameters:
response: requests.Response object from a rate-limited requestattempt: Current attempt number for exponential backoff calculation
Returns: Integer number of seconds to wait before retrying (capped at 60 seconds)
_download_document_content(self, doc: Dict) -> Optional[bytes]
Purpose: Download document content from SharePoint using direct download URL or Graph API fallback
Parameters:
doc: Document information dictionary containing 'download_url', 'graph_id', and 'name'
Returns: File content as bytes if successful, None if download failed
_get_filecloud_path(self, doc: Dict) -> str
Purpose: Generate normalized FileCloud path for a SharePoint document by combining base path with relative path
Parameters:
doc: Document information dictionary containing 'relative_path' or 'name'
Returns: Normalized FileCloud path string starting with '/'
_parse_sharepoint_date(self, date_str: str) -> datetime
Purpose: Parse SharePoint date string to timezone-aware datetime object, handling various formats including ISO, year-only, and date-only
Parameters:
date_str: Date string from SharePoint in various formats (ISO 8601, 'YYYY', 'YYYY-MM-DD', etc.)
Returns: datetime object with UTC timezone; returns current time if parsing fails
run_continuous_sync(self, max_documents: int = None)
Purpose: Run continuous synchronization at configured intervals indefinitely until interrupted
Parameters:
max_documents: Optional limit on number of documents to process per cycle
Returns: None - runs indefinitely until KeyboardInterrupt or exception
run_single_sync(self, max_documents: int = None) -> Dict[str, int]
Purpose: Run a single synchronization cycle (convenience wrapper for sync_documents)
Parameters:
max_documents: Maximum number of documents to process for debugging
Returns: Dictionary with sync statistics (same as sync_documents)
Attributes
| Name | Type | Description | Scope |
|---|---|---|---|
logger |
logging.Logger | Logger instance for this class, initialized with __name__ | instance |
sp_client |
SharePointGraphClient | Client instance for interacting with SharePoint via Microsoft Graph API | instance |
fc_client |
FileCloudClient | Client instance for interacting with FileCloud API | instance |
Dependencies
sharepoint_graph_clientfilecloud_clientconfigdatetimeloggingtimerequeststypingemail
Required Imports
from sharepoint_graph_client import SharePointGraphClient
from filecloud_client import FileCloudClient
from config import Config
from datetime import datetime
import logging
import time
import requests
from typing import Dict, List, Optional
Conditional/Optional Imports
These imports are only needed under specific conditions:
from email.utils import parsedate_to_datetime
Condition: used in _get_retry_delay method when parsing HTTP date headers from rate-limited responses
Required (conditional)from datetime import timezone
Condition: used in _parse_sharepoint_date method for timezone-aware datetime objects
Required (conditional)Usage Example
# Single synchronization run
from sharepoint_filecloud_sync import SharePointFileCloudSync
# Initialize the sync service (validates config and creates clients)
sync_service = SharePointFileCloudSync()
# Run a single sync cycle
stats = sync_service.run_single_sync()
print(f"Synced {stats['new_uploads']} new files, updated {stats['updated_files']} files")
print(f"Skipped {stats['skipped_files']} files, {stats['errors']} errors")
# Or run with document limit for testing
stats = sync_service.run_single_sync(max_documents=10)
# Continuous synchronization mode
sync_service = SharePointFileCloudSync()
sync_service.run_continuous_sync() # Runs indefinitely
# Continuous sync with document limit
sync_service.run_continuous_sync(max_documents=100)
Best Practices
- Always ensure Config is properly set up with all required credentials before instantiating the class
- Use max_documents parameter during testing to limit scope and avoid long-running operations
- Monitor the returned statistics dictionary to track sync health and identify issues
- The class handles rate limiting automatically with exponential backoff, but be aware of API quotas
- For production use, run_continuous_sync() should be used with proper process management (systemd, supervisor, etc.)
- The class creates clients in __init__, so instantiation may fail if credentials are invalid
- Direct download URLs from SharePoint may expire quickly; the class automatically falls back to Graph API
- Empty folder creation is controlled by Config.CREATE_EMPTY_FOLDERS flag
- All operations are logged; ensure logging is properly configured for monitoring
- The class maintains state through sp_client and fc_client instances; do not share instances across threads
- run_continuous_sync() runs indefinitely and should be interrupted with KeyboardInterrupt or process termination
- Date comparison logic considers timezone information; ensure SharePoint dates are properly formatted
- Error handling is comprehensive but individual document failures don't stop the entire sync process
Tags
Similar Components
AI-powered semantic similarity - components with related functionality:
-
class SharePointClient 75.2% similar
-
class SyncDiagnostics 73.4% similar
-
function main_v9 72.6% similar
-
function main_v21 70.3% similar
-
class Config 70.2% similar