🔍 Code Extractor

class FileCloudAPI_v1

Maturity: 28

Python wrapper for the FileCloud REST API. This class provides methods to interact with FileCloud server APIs, handling authentication, session management, and various file operations.

File:
/tf/active/vicechatdev/FC_api.py
Lines:
19 - 3233
Complexity:
moderate

Purpose

Python wrapper for the FileCloud REST API. This class provides methods to interact with FileCloud server APIs, handling authentication, session management, and various file operations.

Source Code

class FileCloudAPI:
    """
    Python wrapper for the FileCloud REST API.
    
    This class provides methods to interact with FileCloud server APIs,
    handling authentication, session management, and various file operations.
    """
    
    def __init__(self, server_url: str, username: str = None, password: str = None):
        """
        Initialize the FileCloud API client.
        
        Args:
            server_url: The URL of the FileCloud server (e.g., 'https://filecloud.example.com/')
            username: Username for authentication (optional if authenticating later)
            password: Password for authentication (optional if authenticating later)
        """
        self.server_url = server_url.rstrip('/')
        self.username = username
        self.password = password
        self.session = requests.session()
        self.authenticated = False
        self.authenticated_admin = False
        self.headers = {'Accept': 'application/json'}
    
    def login(self, username: str = None, password: str = None) -> bool:
        """
        Authenticate with the FileCloud server.
        
        Args:
            username: Override the username provided during initialization
            password: Override the password provided during initialization
            
        Returns:
            bool: True if authentication is successful, False otherwise.
        """
        if username:
            self.username = username
        if password:
            self.password = password
            
        if not self.username or not self.password:
            raise ValueError("Username and password are required for authentication")
            
        login_endpoint = '/core/loginguest'
        credentials = {'userid': self.username, 'password': self.password}
        
        try:
            response = self.session.post(
                f"{self.server_url}{login_endpoint}", 
                data=credentials, 
                headers=self.headers
            )
            
            login_call = response.json()
            
            if login_call['command'][0]['result'] == 1:
                self.authenticated = True
                self.authenticated_admin = False
                return True
            else:
                self.authenticated = False
                error_message = login_call['command'][0].get('message', 'Unknown error')
                print(f"Login failed: {error_message}")
                return False
        except Exception as e:
            print(f"Login error: {str(e)}")
            self.authenticated = False
            return False

    def adminlogin(self, username: str = None, password: str = None) -> bool:
        """
        Authenticate with the FileCloud server.
        
        Args:
            username: Override the username provided during initialization
            password: Override the password provided during initialization
            
        Returns:
            bool: True if authentication is successful, False otherwise.
        """
        if username:
            self.username = username
        if password:
            self.password = password
            
        if not self.username or not self.password:
            raise ValueError("Username and password are required for authentication")
            
        login_endpoint = '/admin/adminlogin'
        credentials = {'adminuser': self.username, 'adminpassword': self.password}
        
        try:
            response = self.session.post(
                f"{self.server_url}{login_endpoint}", 
                data=credentials, 
                headers=self.headers
            )
            
            login_call = response.json()
            
            if login_call['command'][0]['result'] == 1:
                self.authenticated_admin = True
                self.authenticated = False
                return True
            else:
                self.authenticated_admin = False
                error_message = login_call['command'][0].get('message', 'Unknown error')
                print(f"Login failed: {error_message}")
                return False
        except Exception as e:
            print(f"Login error: {str(e)}")
            self.authenticated_admin = False
            return False

    def authenticate(self, username: str = None, password: str = None) -> bool:
        """Alias for login() for backward compatibility"""
        return self.login(username, password)
    
    def logout(self) -> Dict:
        """
        Logout from the FileCloud server.
        
        Returns:
            Dict: Response from the server.
        """
        if not self.authenticated:
            return {"success": True, "message": "Not logged in"}
        
        logout_endpoint = '/core/locksession'
        
        response = self.session.post(
            f"{self.server_url}{logout_endpoint}",
            cookies=self.session.cookies
        )
        
        self.authenticated = False
        self.session.cookies.clear()
        
        return self._parse_response(response)
    
    def _ensure_authenticated(self):
        """
        Ensure that the client is authenticated with the server.
        Attempts to login if not already authenticated.
        
        Raises:
            Exception: If authentication fails.
        """
        if not self.authenticated:
            if not self.login():
                raise Exception("Authentication failed. Please check your credentials.")

    def _ensure_authenticated_admin(self):
        """
        Ensure that the client is authenticated with the server.
        Attempts to login if not already authenticated.
        
        Raises:
            Exception: If authentication fails.
        """
        if not self.authenticated_admin:
            if not self.adminlogin():
                raise Exception("Authentication failed. Please check your credentials.")
    

    def _parse_response(self, response: requests.Response) -> Dict:
        """
        Parse response based on content type.
        
        Args:
            response: The HTTP response object.
            
        Returns:
            Dict: Parsed response as a dictionary.
        """
        if response.status_code != 200:
            return {
                "success": False,
                "message": f"HTTP error {response.status_code}: {response.reason}"
            }
        
        if response.text == 'OK':
            return {"success": True, "message": "Operation completed successfully"}
        
        content_type = response.headers.get('Content-Type', '')
        
        try:
            # Handle JSON response
            if 'application/json' in content_type:
                json_data = response.json()
                
                # Extract share_id and share_url for add_share responses
                if 'command' in json_data and isinstance(json_data['command'], list):
                    command_data = json_data['command'][0]
                    if 'shareid' in command_data:
                        json_data['share_id'] = command_data.get('shareid')
                        json_data['share_url'] = command_data.get('shareurl')
                
                return json_data
            
            # Handle XML response - needed for FileCloud APIs that return XML
            elif 'text/xml' in content_type or 'application/xml' in content_type or response.text.strip().startswith('<?xml') or '<' in response.text:
                try:
                    # Parse XML
                    root = ET.fromstring(response.text)
                    
                    # Check for error response
                    if root.tag == 'error':
                        return {
                            "success": False,
                            "message": root.text if root.text else "Unknown error"
                        }
                    
                    # Handle workflow list response format
                    if root.tag == 'workflows':
                        meta = root.find('meta')
                        total = int(meta.find('total').text) if meta and meta.find('total') is not None else 0
                        
                        workflow_elements = root.findall('workflow')
                        workflows = []
                        
                        for workflow_elem in workflow_elements:
                            workflow = {}
                            for child in workflow_elem:
                                # Remove leading underscore from tag names (like _id)
                                tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
                                workflow[tag_name] = child.text
                            workflows.append(workflow)
                        
                        return {
                            "success": True,
                            "data": {
                                "workflows": {
                                    "workflow": workflows
                                },
                                "meta": {
                                    "total": total,
                                    "count": len(workflows)
                                }
                            }
                        }
                    
                    # Handle workflow detail response
                    elif root.tag == 'workflow':
                        workflow = {}
                        for child in root:
                            # Remove leading underscore from tag names (like _id)
                            tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
                            workflow[tag_name] = child.text
                        
                        return {
                            "success": True,
                            "data": {
                                "workflow": workflow
                            }
                        }
                    # Handle metadata sets response
                    elif root.tag == 'sets':
                        sets_list = []
                        for set_elem in root.findall('set'):
                            set_dict = {}
                            for child in set_elem:
                                # Remove leading underscore from tag names
                                tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
                                set_dict[tag_name] = child.text
                            sets_list.append(set_dict)
                        
                        return {
                            "success": True,
                            "data": {
                                "sets": sets_list
                            }
                        }
    
                    # Handle metadata attributes response
                    elif root.tag == 'attributes':
                        attrs_list = []
                        for attr_elem in root.findall('attribute'):
                            attr_dict = {}
                            for child in attr_elem:
                                # Remove leading underscore from tag names
                                tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
                                
                                # Handle options list
                                if tag_name == 'options' and len(child) > 0:
                                    options = []
                                    for option in child.findall('option'):
                                        options.append(option.text)
                                    attr_dict[tag_name] = options
                                else:
                                    attr_dict[tag_name] = child.text
                            attrs_list.append(attr_dict)
                        
                        return {
                            "success": True,
                            "data": {
                                "attributes": attrs_list
                            }
                        }
                    
                    # Handle workflow run list response
                    elif root.tag == 'runs':
                        meta = root.find('meta')
                        total = int(meta.find('total').text) if meta and meta.find('total') is not None else 0
                        
                        run_elements = root.findall('run')
                        runs = []
                        
                        for run_elem in run_elements:
                            run = {}
                            for child in run_elem:
                                tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
                                run[tag_name] = child.text
                            runs.append(run)
                        
                        return {
                            "success": True,
                            "data": {
                                "runs": runs,
                                "meta": {
                                    "total": total,
                                    "count": len(runs)
                                }
                            }
                        }
                    
                    # Handle workflow run detail response
                    elif root.tag == 'run':
                        run = {}
                        for child in root:
                            tag_name = child.tag[1:] if child.tag.startswith('_') else child.tag
                            run[tag_name] = child.text
                        
                        return {
                            "success": True,
                            "data": {
                                "run": run
                            }
                        }
                    
                    # Handle share responses
                    elif root.tag == 'shares' or root.tag == 'share':
                        result = {
                            "success": True,
                            "data": {
                                root.tag: self._xml_to_dict(root)
                            }
                        }
                        
                        # Extract share ID and URL
                        share_id = None
                        share_url = None
                        
                        if root.tag == 'share':
                            share_id_elem = root.find('shareid')
                            share_url_elem = root.find('shareurl')
                            if share_id_elem is not None:
                                share_id = share_id_elem.text
                            if share_url_elem is not None:
                                share_url = share_url_elem.text
                        elif root.tag == 'shares':
                            share_elem = root.find('share')
                            if share_elem is not None:
                                share_id_elem = share_elem.find('shareid')
                                share_url_elem = share_elem.find('shareurl')
                                if share_id_elem is not None:
                                    share_id = share_id_elem.text
                                if share_url_elem is not None:
                                    share_url = share_url_elem.text
                        
                        if share_id:
                            result['share_id'] = share_id
                        if share_url:
                            result['share_url'] = share_url
                        
                        return result
                    
                    # Check for command structure (common in FileCloud API responses)
                    command_elem = root.find('command')
                    if command_elem is not None:
                        cmd_type = command_elem.find('type')
                        cmd_result = command_elem.find('result')
                        cmd_message = command_elem.find('message')
                        
                        result = {
                            "success": cmd_result is not None and cmd_result.text == "1",
                            "command_type": cmd_type.text if cmd_type is not None else None,
                            "message": cmd_message.text if cmd_message is not None else None,
                            "data": {}
                        }
                        
                        # Extract share ID and URL for add_share responses
                        share_id_elem = command_elem.find('shareid')
                        share_url_elem = command_elem.find('shareurl')
                        
                        if share_id_elem is not None:
                            result['share_id'] = share_id_elem.text
                        if share_url_elem is not None:
                            result['share_url'] = share_url_elem.text
                        
                        return result
                    
                    # Generic XML to dict conversion as fallback
                    result = {
                        "success": True,
                        "xml_root_tag": root.tag,
                        "data": {root.tag: self._xml_to_dict(root)}
                    }
                    
                    # Last attempt to extract share ID and URL from generic XML response
                    if 'shareid' in response.text or 'shareurl' in response.text:
                        import re
                        share_id_match = re.search(r'<shareid>([^<]+)</shareid>', response.text)
                        share_url_match = re.search(r'<shareurl>([^<]+)</shareurl>', response.text)
                        
                        if share_id_match:
                            result['share_id'] = share_id_match.group(1)
                        if share_url_match:
                            result['share_url'] = share_url_match.group(1)
                    
                    return result
                    
                except ET.ParseError as xml_err:
                    return {
                        "success": False,
                        "message": f"Failed to parse XML response: {str(xml_err)}",
                        "raw_content": response.text[:500]  # Include first 500 chars for debugging
                    }
            
            # Try to parse as JSON regardless of content type
            elif response.text.strip().startswith('{') or response.text.strip().startswith('['):
                json_data = response.json()
                
                # Extract share_id and share_url for add_share responses in JSON format
                if 'command' in json_data and isinstance(json_data['command'], list):
                    command_data = json_data['command'][0]
                    if 'shareid' in command_data:
                        json_data['share_id'] = command_data.get('shareid')
                        json_data['share_url'] = command_data.get('shareurl')
                
                return json_data
            
            # Return raw text for unrecognized formats
            else:
                # Check if response might contain share information
                if 'shareid' in response.text or 'shareurl' in response.text:
                    import re
                    result = {
                        "success": True,
                        "message": "Extracted share information from unrecognized format",
                        "raw_content": response.text[:500]  # Include first 500 chars for debugging
                    }
                    
                    share_id_match = re.search(r'shareid["\s:=]+([^"&\s,]+)', response.text, re.IGNORECASE)
                    share_url_match = re.search(r'shareurl["\s:=]+([^"&\s,]+)', response.text, re.IGNORECASE)
                    
                    if share_id_match:
                        result['share_id'] = share_id_match.group(1)
                    if share_url_match:
                        result['share_url'] = share_url_match.group(1)
                    
                    return result
                
                return {
                    "success": False,
                    "message": "Unrecognized response format",
                    "content_type": content_type,
                    "raw_content": response.text[:500]  # Include first 500 chars for debugging
                }
                
        except json.JSONDecodeError as e:
            # Check if the response text might contain share information
            if 'shareid' in response.text or 'shareurl' in response.text:
                import re
                result = {
                    "success": True,
                    "message": "Extracted share information despite JSON parse error",
                    "content_type": content_type,
                    "raw_content": response.text[:500]  # Include first 500 chars for debugging
                }
                
                share_id_match = re.search(r'shareid["\s:=]+([^"&\s,]+)', response.text, re.IGNORECASE)
                share_url_match = re.search(r'shareurl["\s:=]+([^"&\s,]+)', response.text, re.IGNORECASE)
                
                if share_id_match:
                    result['share_id'] = share_id_match.group(1)
                if share_url_match:
                    result['share_url'] = share_url_match.group(1)
                
                return result
            
            return {
                "success": False,
                "message": f"Failed to parse response as JSON: {str(e)}",
                "content_type": content_type,
                "raw_content": response.text[:500]  # Include first 500 chars for debugging
            }
        except Exception as e:
            return {
                "success": False,
                "message": f"Error processing response: {str(e)}",
                "content_type": content_type,
                "raw_content": response.text[:500]  # Include first 500 chars for debugging
            }

    def _xml_to_dict(self, element):
        """
        Convert an XML element to a dictionary.
        
        Args:
            element: XML element to convert
            
        Returns:
            Dict or str: Converted element
        """
        result = {}
        
        # Handle attributes
        for key, value in element.attrib.items():
            result[f"@{key}"] = value
        
        # Handle children
        for child in element:
            child_tag = child.tag
            # Remove leading underscore from tag names (like _id)
            if child_tag.startswith('_'):
                child_tag = child_tag[1:]
                
            if len(child) == 0 and not child.attrib:
                # Simple text node
                if child.text is None:
                    result[child_tag] = ''
                else:
                    result[child_tag] = child.text.strip()
            else:
                # Complex node
                child_dict = self._xml_to_dict(child)
                
                if child_tag in result:
                    # If key already exists, convert to list if not already
                    if not isinstance(result[child_tag], list):
                        result[child_tag] = [result[child_tag]]
                    result[child_tag].append(child_dict)
                else:
                    result[child_tag] = child_dict
        
        # Handle element text
        if element.text and element.text.strip() and not result:
            return element.text.strip()
        
        return result
    
    def _extract_paths_from_xml(self, xml_content: str) -> List[str]:
        """
        Extract file paths from XML search results.
        
        Args:
            xml_content: XML content from search results
            
        Returns:
            List[str]: List of file paths
        """
        paths = []
        try:
            root = ET.fromstring(xml_content)
            for entry in root.findall('.//entry'):
                path_elem = entry.find('path')
                if path_elem is not None and path_elem.text:
                    paths.append(path_elem.text)
        except Exception as e:
            print(f"Error parsing XML: {str(e)}")
        
        return paths
    
    # File Operations
    
    def upload_file(self, local_file_path: str, remote_path: str, 
                   filename: Optional[str] = None, overwrite: bool = False) -> Dict:
        """
        Upload a file to the FileCloud server.
        
        Args:
            local_file_path: Path to the local file to upload
            remote_path: Directory path on the server where to upload the file
            filename: Optional name to use for the file on the server (default: original filename)
            overwrite: Whether to overwrite existing files with the same name
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        upload_endpoint = '/core/upload'
        
        if not filename:
            filename = os.path.basename(local_file_path)
        
        # Prepare upload parameters
        upload_params = {
            'appname': 'explorer',
            'path': remote_path,
            'offset': 0,
            'complete': 1,
            'filename': filename
        }
        
        if overwrite:
            upload_params['overwrite'] = 1
        
        try:
            with open(local_file_path, 'rb') as file_obj:
                files = {'file': (filename, file_obj)}
                response = self.session.post(
                    f"{self.server_url}{upload_endpoint}",
                    params=upload_params,
                    files=files,
                    cookies=self.session.cookies
                )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Upload failed: {str(e)}"}
    
    def download_file(self, remote_file_path: str, local_file_path: Optional[str] = None, 
                     check_only: bool = False) -> Union[Dict, bytes]:
        """
        Download a file from the FileCloud server.
        
        Args:
            remote_file_path: Full path to the file on the server
            local_file_path: Path where to save the downloaded file (if None, returns content)
            check_only: If True, only checks if the file is downloadable without downloading
            
        Returns:
            Union[Dict, bytes]: Either a response dict or the file content as bytes
        """
        self._ensure_authenticated()
        
        download_endpoint = '/core/downloadfile'
        
        # Extract filename from the path
        filename = os.path.basename(remote_file_path)
        
        download_params = {
            'filepath': remote_file_path,
            'filename': filename
        }
        
        if check_only:
            download_params['checkonly'] = 1
        
        try:
            response = self.session.get(
                f"{self.server_url}{download_endpoint}",
                params=download_params,
                cookies=self.session.cookies,
                stream=True  # Use streaming for large files
            )
            
            if response.status_code != 200:
                return {"success": False, "message": f"Download failed with status {response.status_code}"}
            
            # If check_only, just return success
            if check_only:
                if response.text == 'OK':
                    return {"success": True, "message": "File can be downloaded"}
                return {"success": False, "message": response.text}
            
            # If local_file_path is provided, save the file
            if local_file_path:
                with open(local_file_path, 'wb') as f:
                    for chunk in response.iter_content(chunk_size=8192):
                        f.write(chunk)
                return {"success": True, "message": f"File saved to {local_file_path}"}
            
            # Otherwise, return the content
            return response.content
        except Exception as e:
            return {"success": False, "message": f"Download failed: {str(e)}"}
    
    def get_file_list(self, path: str, sort_by: str = "name", sort_dir: int = 1, 
                     start: int = 0, limit: int = -1, include_metadata: bool = False) -> Dict:
        """
        Get a list of files and directories in the specified path.
        
        Args:
            path: Path to list files from
            sort_by: Field to sort by ("name", "date", or "size")
            sort_dir: Sort direction (1 for ascending, -1 for descending)
            start: Index to start from (for pagination)
            limit: Maximum number of entries to return (-1 for unlimited)
            include_metadata: Whether to include metadata information
            
        Returns:
            Dict: Response containing the file list
        """
        self._ensure_authenticated()
        
        list_endpoint = '/core/getfilelist'
        
        params = {
            'path': path,
            'sortby': sort_by,
            'sortdir': sort_dir,
            'start': start,
            'limit': limit
        }
        
        if include_metadata:
            params['sendmetadatasetinfo'] = 1
        
        try:
            response = self.session.post(
                f"{self.server_url}{list_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            result= self._parse_response(response)
            if result.get("success") and response.text.strip().startswith('<'):
                result["paths"] = self._extract_paths_from_xml(response.text)
            return result
            
        except Exception as e:
            return {"success": False, "message": f"Failed to get file list: {str(e)}"}
    
    def create_folder(self, path: str, folder_name: Optional[str] = None, 
                     subpath: Optional[str] = None) -> Dict:
        """
        Create a new folder on the server.
        
        Args:
            path: Path where to create the folder
            folder_name: Name of the folder to create
            subpath: Alternative to folder_name, creates all missing folders in this path
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        create_folder_endpoint = '/core/createfolder'
        
        params = {'path': path}
        
        if folder_name:
            params['name'] = folder_name
        
        if subpath:
            params['subpath'] = subpath
        
        try:
            response = self.session.post(
                f"{self.server_url}{create_folder_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to create folder: {str(e)}"}
    
    def delete_file(self, path: str, name: str) -> Dict:
        """
        Delete a file or folder.
        
        Args:
            path: Path where the file/folder is located
            name: Name of the file/folder to delete
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        delete_endpoint = '/core/deletefile'
        
        params = {
            'path': path,
            'name': name
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{delete_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to delete file: {str(e)}"}
    
    def rename_or_move(self, from_path: str, to_path: str, overwrite: bool = False) -> Dict:
        """
        Rename or move a file or folder.
        
        Args:
            from_path: Full path to the source file/folder
            to_path: Full path to the destination
            overwrite: Whether to overwrite if destination exists
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        move_endpoint = '/core/renameormove'
        
        params = {
            'fromname': from_path,
            'toname': to_path
        }
        
        if overwrite:
            params['overwrite'] = 1
        
        try:
            response = self.session.post(
                f"{self.server_url}{move_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to rename or move: {str(e)}"}
    
    def copy_file(self, path: str, name: str, copy_to: str, overwrite: bool = False) -> Dict:
        """
        Copy a file to a new location.
        
        Args:
            path: Path where the file is located
            name: Name of the file to copy
            copy_to: Full path to the destination
            overwrite: Whether to overwrite if destination exists
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        copy_endpoint = '/core/copyfile'
        
        params = {
            'path': path,
            'name': name,
            'copyto': copy_to
        }
        
        if overwrite:
            params['overwrite'] = 1
        
        try:
            response = self.session.post(
                f"{self.server_url}{copy_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to copy file: {str(e)}"}
    
    # Search and Metadata Operations
    
    def search(self, search_string: str, search_scope: str = '0', search_location: Optional[str] = None,
              min_size: Optional[int] = None, max_size: Optional[int] = None, 
              start: int = 0, limit: str = '10000') -> Dict:
        """
        Search for files and folders.
        
        Args:
            search_string: String to search for
            search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
            search_location: Path to search within (None for all accessible locations)
            min_size: Minimum file size in KB
            max_size: Maximum file size in KB
            start: Start index for pagination
            limit: Maximum number of results to return
            
        Returns:
            Dict: Search results
        """
        self._ensure_authenticated()
        
        search_endpoint = '/core/dosearch'
        
        params = {
            'searchstring': search_string,
            'searchscope': search_scope,
            'start': start,
            'limit': limit,
            'maxsearchentries' : limit,
            'refresh': 1
        }
        
        if search_location:
            params['searchloc'] = search_location
        
        if min_size is not None:
            params['minsize'] = min_size
        
        if max_size is not None:
            params['maxsize'] = max_size
        
        try:
            response = self.session.post(
                f"{self.server_url}{search_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            while 'INPROGRESS' in response.text:
                logger.info("Search is in progress, waiting for completion...")
                time.sleep(5)
                params['refresh'] = 0
                response = self.session.post(
                    f"{self.server_url}{search_endpoint}",
                    params=params,
                    cookies=self.session.cookies
                )
            
            result = self._parse_response(response)
            
            # Extract paths if it's a successful XML response
            if result.get("success") and response.text.strip().startswith('<'):
                result["paths"] = self._extract_paths_from_xml(response.text)
            
            return result
        except Exception as e:
            return {"success": False, "message": f"Search failed: {str(e)}"}
    
    def search_metadata(self, search_string: str = "**", search_scope: str = '3', 
                       attributes: Optional[List[Dict[str, Any]]] = None, 
                       search_location: Optional[str] = None) -> Dict:
        """
        Search for files based on metadata attributes.
        
        Args:
            search_string: String to search for (use '**' for all files)
            search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
            attributes: List of attribute dictionaries, each containing one of these formats:
                - Format 1: {'id': 'attr_id', 'value': 'attr_value', 'type': type_int}
                - Format 2: {'setid': 'set_id', 'attributeid': 'attr_id', 'operator': 'equals', 'value': 'attr_value'}
            search_location: Optional path to limit the search to
            
        Returns:
            Dict: Search results with paths
        
        Example:
            # Search for files with multiple metadata criteria
            client.search_metadata(
                attributes=[
                    {"id": "abc123", "value": "Important", "type": 1},
                    {"id": "def456", "value": "2023", "type": 2}
                ],
                search_location="/user/documents"
            )
        """
        self._ensure_authenticated()
        
        search_endpoint = '/core/dosearch' # Using the correct endpoint according to API spec
        
        params = {
            'searchstring': search_string,
            'searchscope': search_scope,
            'limit' : 10000
        }
        
        if search_location:
            params['location'] = search_location
        
        # Handle multiple metadata attributes
        if attributes and len(attributes) > 0:
            #params['searchattributes'] = 1
            params['searchattr_total'] = len(attributes)
            
            for i, attr in enumerate(attributes):
                # Check which format the attribute is in
                if 'id' in attr:
                    # Format 1: id, value, type
                    params[f'searchattr{i}_id'] = attr['id']
                    params[f'attributevalue{i}'] = attr['value']
                    if 'type' in attr:
                        params[f'searchattr{i}type'] = attr['type_id']
                elif 'attributeid' in attr:
                    # Format 2: setid, attributeid, operator, value
                    #params[f'attributesetid{i}'] = attr.get('setid')
                    params[f'searchattr{i}_id'] = attr.get('attributeid')
                    #params[f'attributeoperator{i}'] = attr.get('operator', 'equals')
                    params[f'searchattr{i}_value'] = attr.get('value')
                    params[f'searchattr{i}_type'] = int(attr.get('type_id'))
        
        try:
            logger.info(f"Searching metadata with params: {params}")
            response = self.session.post(
                f"{self.server_url}{search_endpoint}",
                data=params,
                cookies=self.session.cookies
            )

            logger.info("resposne: %s", response.text)
            
            result = self._parse_response(response)
            
            # Extract paths from search results if successful and XML response
            if result.get('success', False) and hasattr(response, 'text') and response.text.strip().startswith('<'):
                paths = self._extract_paths_from_xml(response.text)
                result['paths'] = paths
            
            return result
        except Exception as e:
            return {"success": False, "message": f"Failed to search metadata: {str(e)}"}
    
    # Also keep the old method signature for backward compatibility
    def search_metadata_single(self, search_string: str = "**", search_scope: str = '3', 
                             attribute_id: Optional[str] = None, attribute_type: Optional[int] = None, 
                             attribute_value: Optional[str] = None) -> Dict:
        """
        Search for files based on a single metadata attribute.
        
        This method is kept for backward compatibility. The search_metadata method is preferred.
        
        Args:
            search_string: String to search for (use '**' for all files)
            search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata)
            attribute_id: Metadata attribute ID to search for
            attribute_type: Metadata attribute type (1=Text, 2=Integer, etc.)
            attribute_value: Metadata attribute value to match
            
        Returns:
            Dict: Search results with paths
        """
        if attribute_id and attribute_value:
            attr = {"id": attribute_id, "value": attribute_value}
            if attribute_type:
                attr["type"] = attribute_type
            
            return self.search_metadata(search_string, search_scope, [attr])
        else:
            return self.search_metadata(search_string, search_scope)
    
    def get_metadata_values(self, file_path: str) -> Dict:
        """
        Get metadata values for a specific file.
        
        Args:
            file_path: Path to the file
            
        Returns:
            Dict: Metadata values for the file with support for multiple sets
        """
        self._ensure_authenticated()
        
        metadata_endpoint = '/core/getmetadatavalues'
        
        params = {
            'fullpath': file_path
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{metadata_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            result = self._parse_response(response)
            
            # Additional logging for metadata values response
            if result.get('success'):
                logger.debug(f"Successfully retrieved metadata values for {file_path}")
            else:
                logger.warning(f"Failed to retrieve metadata values: {result.get('message', 'Unknown error')}")
                
            return result
        except Exception as e:
            logger.error(f"Error in get_metadata_values: {str(e)}")
            return {"success": False, "message": f"Failed to get metadata values: {str(e)}"}
    
    def save_attribute_values(self, file_path: str, set_id: str, attribute_values: Dict[str, str]) -> Dict:
        """
        Save metadata attribute values for a file or folder.
        
        Args:
            file_path: Full path to the file or folder
            set_id: ID of the metadata set
            attribute_values: Dictionary of attribute IDs and values
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        save_endpoint = '/core/saveattributevalues'
        
        params = {
            'fullpath': file_path,  # Use the correct parameter name
            'setid': set_id,
            'attributes_total': len(attribute_values)
        }
        
        # Add attribute values to params in the format expected by the API
        for i, (attr_id, value) in enumerate(attribute_values.items()):
            params[f'attribute{i}_attributeid'] = attr_id
            params[f'attribute{i}_value'] = value
        
        try:
            response = self.session.post(
                f"{self.server_url}{save_endpoint}",
                data=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to save attributes: {str(e)}"}
    
    def get_file_info(self, file_path: str) -> Dict:
        """
        Get information about a file.
        
        Args:
            file_path: Full path to the file
            
        Returns:
            Dict: File information
        """
        self._ensure_authenticated()
        
        info_endpoint = '/core/fileinfo'
        
        params = {
            'file': file_path
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{info_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get file info: {str(e)}"}
    
    # Advanced Directory Operations
    
    def create_directory_tree(self, base_path: str, target_path: str) -> Dict:
        """
        Create a directory tree, creating all necessary parent directories.
        
        Args:
            base_path: Base path where to create the directories
            target_path: Path structure to create (relative to base_path)
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        path_elements = target_path.strip('/').split('/')
        walking = True
        cumul_path = ""
        
        # Process each path element
        for i, p in enumerate(path_elements):
            if not p:  # Skip empty elements
                continue
            
            if walking:
                # Check if this path element exists
                info_endpoint = '/core/getfilelist'
                params = {
                    'path': f"{base_path}{cumul_path}"
                }
                
                try:
                    response = self.session.post(
                        f"{self.server_url}{info_endpoint}",
                        params=params,
                        cookies=self.session.cookies
                    )
                    
                    parsed = xmltodict.parse(response.text)
                    found = False
                    
                    # Check if the directory exists
                    try:
                        entries = parsed.get('entries', {}).get('entry', [])
                        if not isinstance(entries, list):
                            entries = [entries]
                            
                        for e in entries:
                            if e.get('name') == p and e.get('type') == 'dir':
                                found = True
                                break
                    except:
                        pass
                    
                    if found:
                        # Directory exists, move to next level
                        cumul_path = f"{cumul_path}/{p}"
                    else:
                        # Need to create this directory and possibly all subdirectories
                        create_endpoint = '/core/createfolder'
                        remaining_path = "/".join(path_elements[i:])
                        create_params = {
                            'path': f"{base_path}{cumul_path}",
                            'subpath': remaining_path
                        }
                        
                        create_response = self.session.post(
                            f"{self.server_url}{create_endpoint}",
                            params=create_params,
                            cookies=self.session.cookies
                        )
                        
                        walking = False  # Stop walking since we've created all directories
                        return self._parse_response(create_response)
                except Exception as e:
                    return {"success": False, "message": f"Failed to create directory tree: {str(e)}"}
        
        # If we walked through the entire path, all directories exist
        return {"success": True, "message": "All directories already exist"}
    
    # ACL Management Operations
    
    def add_acl_entry(self, path: str, entry_type: str, value: str, 
                     permissions: str, flag: Optional[str]=None) -> Dict:
        """
        Add an Access Control List (ACL) entry to a file or folder.
        
        Args:
            path: Path to the file or folder
            entry_type: Type of entry ('user' or 'group')
            value: Username or group name
            permissions: Permission string (e.g., 'R', 'W', 'D', 'S')
            flag: Optional flag parameter - 'allow', 'deny'
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/addaclentry'
        
        params = {
            'path': path,
            'type': entry_type,
            'value': value,
            'perm': permissions,
            'flag': 'allow'
        }
        
        #if flag:
        params['flag'] = 'allow'
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to add ACL entry: {str(e)}"}
    
    def delete_acl_entry(self, path: str, entry_type: str, value: str) -> Dict:
        """
        Delete an Access Control List (ACL) entry from a file or folder.
        
        Args:
            path: Path to the file or folder
            entry_type: Type of entry ('user' or 'group')
            value: Username or group name
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/deleteaclentry'
        
        params = {
            'path': path,
            'type': entry_type,
            'value': value
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to delete ACL entry: {str(e)}"}
    
    def get_acl(self, path: str, filter_term: Optional[str] = None, 
               list_inherited: bool = False) -> Dict:
        """
        Get Access Control List (ACL) for a specific path.
        
        Args:
            path: Path to the file or folder
            filter_term: Optional filter to apply to the results
            list_inherited: Whether to include inherited permissions
            
        Returns:
            Dict: ACL information
        """
        self._ensure_authenticated()
        
        endpoint = '/core/getacl'
        
        params = {
            'path': path
        }
        
        if filter_term:
            params['filter'] = filter_term
        
        if list_inherited:
            params['listInherited'] = 1
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get ACL: {str(e)}"}

    def get_effective_acl(self, path: str, emailid: Optional[str] = None) -> Dict:
        """
        Get effective ACL permissions for a user on a specific path.
        
        Used to check for effective permissions for a user for a particular shared path 
        such as read, write, delete, share or manage permissions.
        
        Args:
            path: Path to the file or folder
            emailid: Optional username (emailid)  to check permissions for (if None, checks for current user)

        Returns:
            Dict: Effective ACL information showing what permissions the user has
        """
        self._ensure_authenticated()

        endpoint = '/core/geteffectiveacl'

        params = {
            'path': path
        }

        # Add user parameter if specified
        if emailid:
            params['emailid'] = emailid

        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            logger.info("response: %s", response.text)

            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get effective ACL: {str(e)}"}

    
    def get_acls(self, filter_text: Optional[str] = None, 
                start: int = 0, limit: int = 100) -> Dict:
        """
        Get all Access Control Lists (ACLs) in the system.
        
        Args:
            filter_text: Optional text to filter results
            start: Start index for pagination
            limit: Maximum number of results to return
            
        Returns:
            Dict: List of ACLs
        """
        self._ensure_authenticated()
        
        endpoint = '/core/getacls'
        
        params = {
            'start': start,
            'limit': limit
        }
        
        if filter_text:
            params['filtertext'] = filter_text
        
        try:
            response = self.session.get(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get ACLs: {str(e)}"}
    
    def get_all_acls_for_path(self, path: str) -> Dict:
        """
        Get all Access Control Lists (ACLs) for a specific path.
        
        Args:
            path: Path to the file or folder
            
        Returns:
            Dict: All ACL information for the path
        """
        self._ensure_authenticated()
        
        endpoint = '/core/getallaclsforpath'
        
        params = {
            'path': path
        }
        
        try:
            response = self.session.get(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get all ACLs for path: {str(e)}"}

# ...existing code...

    # Share Management Operations
    
    def add_user_to_share(self, user_id: str, share_id: str) -> Dict:
        """
        Add a user to a private share.
        
        Args:
            user_id: ID or username of the user to add
            share_id: ID of the share
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/addusertoshare'
        
        params = {
            'userid': user_id,
            'shareid': share_id
        }
        logger.info(f"Adding user {user_id} to share {share_id}")
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to add user to share: {str(e)}"}
    
    def add_users_to_share(self, share_id: str, users: List[str], 
                          send_email: bool = True, custom_subject: Optional[str] = None, 
                          custom_message: Optional[str] = None, send_copy: bool = False) -> Dict:
        """
        Add multiple users to a share.
        
        Args:
            share_id: ID of the share
            users: List of usernames or email addresses to add to the share
            send_email: Whether to send notification emails to the users
            custom_subject: Custom subject for the notification email
            custom_message: Custom message for the notification email
            send_copy: Whether to send a copy of the notification to the sharer
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/adduserstoshare'
        
        params = {
            'shareid': share_id,
            'users': ','.join(users),
            'sendemail': 1 if send_email else 0,
            'sendcopy': 1 if send_copy else 0
        }
        
        # Add optional parameters if provided
        if custom_subject:
            params['custom_subject'] = custom_subject
        
        if custom_message:
            params['custom_message'] = custom_message
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to add users to share: {str(e)}"}
    
    def add_group_to_share(self, group_id: str, share_id: str) -> Dict:
        """
        Add a group to a private share.
        
        Args:
            group_id: ID of the group to add
            share_id: ID of the share
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/addgrouptoshare'
        
        params = {
            'groupid': group_id,
            'shareid': share_id
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to add group to share: {str(e)}"}
    
    def delete_user_from_share(self, user_id: str, share_id: str) -> Dict:
        """
        Remove a user from a private share.
        
        Args:
            user_id: ID or username of the user to remove
            share_id: ID of the share
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/deleteuserfromshare'
        
        params = {
            'userid': user_id,
            'shareid': share_id
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to delete user from share: {str(e)}"}
    
    def delete_group_from_share(self, group_id: str, share_id: str) -> Dict:
        """
        Remove a group from a private share.
        
        Args:
            group_id: ID of the group to remove
            share_id: ID of the share
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/deletegroupfromshare'
        
        params = {
            'groupid': group_id,
            'shareid': share_id
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to delete group from share: {str(e)}"}
    
    def get_share_for_path(self, path: str, share_id: Optional[str] = None) -> Dict:
        """
        Get information about a share for a specific path.
        
        Args:
            path: Path to the shared file or folder
            share_id: Optional share ID to filter by
            
        Returns:
            Dict: Share information
        """
        self._ensure_authenticated()
        
        endpoint = '/core/getshareforpath'
        
        params = {
            'path': path
        }
        
        if share_id:
            params['shareid'] = share_id
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get share for path: {str(e)}"}
    
    def get_share_for_id(self, share_id: str) -> Dict:
        """
        Get information about a share using its ID.
        
        Args:
            share_id: ID of the share
            
        Returns:
            Dict: Share information
        """
        self._ensure_authenticated()
        
        endpoint = '/core/getshareforid'
        
        params = {
            'shareid': share_id
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get share for ID: {str(e)}"}
    
    def update_share(self, share_id: str, **kwargs) -> Dict:
        """
        Update share information and settings.
        
        Args:
            share_id: ID of the share to update
            **kwargs: Parameters to update, which can include:
                - sharename: Name of the share
                - sharelocation: Location of the shared file/folder
                - validityperiod: Validity period in days
                - expirytimestamp: Expiry timestamp
                - sharesizelimit: Share size limit
                - maxdownloads: Maximum number of downloads
                - hidenotification: Hide notification (1 or 0)
                - sharepassword: Password for protected share
                - allowpublicaccess: Allow public access (1 or 0)
                - allowpublicupload: Allow public upload (1 or 0)
                - allowpublicviewonly: Allow public view only (1 or 0)
                - allowpublicuploadonly: Allow public upload only (1 or 0)
                - newshareowner: New owner of the share
                - defaultfile: Default file to open when accessing the share
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/updateshare'
        
        # Prepare parameters
        params = {'shareid': share_id}
        
        # Add optional parameters from kwargs
        for key, value in kwargs.items():
            params[key] = value
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to update share: {str(e)}"}
    
    def add_share(self, share_location: str, share_name: str, share_size_limit: Optional[int] = None,
                 max_downloads: Optional[int] = None, validity_period: Optional[int] = None,
                 expiry_timestamp: Optional[int] = None) -> Dict:
        """
        Create a new share.
        
        Args:
            share_location: Path to the file/folder to share
            share_name: Name for the share
            share_size_limit: Size limit for the share in bytes
            max_downloads: Maximum number of downloads allowed
            validity_period: Validity period in days
            expiry_timestamp: Expiry timestamp
            
        Returns:
            Dict: Response from the server with the new share ID
        """
        self._ensure_authenticated()
        
        endpoint = '/core/addshare'
        
        # Prepare parameters
        params = {
            'sharelocation': share_location,
            'sharename': share_name
        }
        
        # Add optional parameters if provided
        if share_size_limit is not None:
            params['sharesizelimit'] = share_size_limit
        
        if max_downloads is not None:
            params['maxdownloads'] = max_downloads
        
        if validity_period is not None:
            params['validityperiod'] = validity_period
        
        if expiry_timestamp is not None:
            params['expirytimestamp'] = expiry_timestamp
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to add share: {str(e)}"}
    
    def update_share_link(self, share_id: str, old_share_link: str, new_share_link: str, 
                         notify_users: bool = False) -> Dict:
        """
        Update the URL of a share.
        
        Args:
            share_id: ID of the share
            old_share_link: Current share link
            new_share_link: New share link to set
            notify_users: Whether to notify users about the link change
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/updatesharelink'
        
        params = {
            'shareid': share_id,
            'oldsharelink': 'url'+old_share_link,
            'newsharelink': 'url'+new_share_link,
            'notifyusers': 1 if notify_users else 0
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to update share link: {str(e)}"}
    
    def get_shared_link(self, share_location: str) -> Dict:
        """
        Get a share link for a specific location.
        
        Args:
            share_location: Path to the shared file or folder
            
        Returns:
            Dict: Share link information
        """
        self._ensure_authenticated()
        
        endpoint = '/core/getsharedlink'
        
        params = {
            'sharelocation': share_location
        }
        
        try:
            response = self.session.get(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get shared link: {str(e)}"}
    
    def set_user_access_for_share(self, share_id: str, user_id: str, 
                                 download: Optional[bool] = None,
                                 write: Optional[bool] = None,
                                 share: Optional[bool] = None,
                                 sync: Optional[bool] = None,
                                 allow_manage: Optional[bool] = None,
                                 disallow_delete: Optional[bool] = None) -> Dict:
        """
        Set permissions for a user on a shared file/folder.
        
        Args:
            share_id: ID of the share
            user_id: ID or username of the user
            download: Whether the user can download
            write: Whether the user can write/edit
            share: Whether the user can share
            sync: Whether the user can sync
            allow_manage: Whether the user can manage the share
            disallow_delete: Whether to disallow deletion
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/setuseraccessforshare'
        
        # Prepare parameters
        params = {
            'shareid': share_id,
            'userid': user_id
        }
        
        # Add optional permission parameters if specified
        if download is not None:
            params['download'] = 'true' if download else 'false'
        
        if write is not None:
            params['write'] = 'true' if write else 'false'
        
        if share is not None:
            params['share'] = 'true' if share else 'false'
        
        if sync is not None:
            params['sync'] = 'true' if sync else 'false'
        
        if allow_manage is not None:
            params['allowmanage'] = 'true' if allow_manage else 'false'
        
        if disallow_delete is not None:
            params['disallowdelete'] = 'true' if disallow_delete else 'false'
        
        try:
            response = self.session.get(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to set user access for share: {str(e)}"}
    
    def set_group_access_for_share(self, share_id: str, group_id: str, 
                                  download: Optional[bool] = None,
                                  write: Optional[bool] = None,
                                  share: Optional[bool] = None,
                                  sync: Optional[bool] = None,
                                  allow_manage: Optional[bool] = None,
                                  disallow_delete: Optional[bool] = None) -> Dict:
        """
        Set permissions for a group on a shared file/folder.
        
        Args:
            share_id: ID of the share
            group_id: ID of the group
            download: Whether the group can download
            write: Whether the group can write/edit
            share: Whether the group can share
            sync: Whether the group can sync
            allow_manage: Whether the group can manage the share
            disallow_delete: Whether to disallow deletion
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/setgroupaccessforshare'
        
        # Prepare parameters
        params = {
            'shareid': share_id,
            'groupid': group_id
        }
        
        # Add optional permission parameters if specified
        if download is not None:
            params['download'] = 'true' if download else 'false'
        
        if write is not None:
            params['write'] = 'true' if write else 'false'
        
        if share is not None:
            params['share'] = 'true' if share else 'false'
        
        if sync is not None:
            params['sync'] = 'true' if sync else 'false'
        
        if allow_manage is not None:
            params['allowmanage'] = 'true' if allow_manage else 'false'
        
        if disallow_delete is not None:
            params['disallowdelete'] = 'true' if disallow_delete else 'false'
        
        try:
            response = self.session.get(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to set group access for share: {str(e)}"}

# Add this method to the FileCloudAPI class

    def delete_share(self, share_id: str) -> Dict:
        """
        Delete a share.
        
        Args:
            share_id: ID of the share to delete
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/deleteshare'
        
        params = {
            'shareid': share_id
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to delete share: {str(e)}"}

    def get_users_for_share(self, share_id: str) -> Dict:
        """
        Get a list of users that are added explicitly to a share.
        
        Args:
            share_id: ID of the share
            
        Returns:
            Dict: Response containing the list of users with their access permissions
        """
        self._ensure_authenticated()
        
        endpoint = '/core/getusersforshare'
        
        params = {
            'shareid': share_id
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get users for share: {str(e)}"}

    # Automation Workflow Operations
    
    def get_automation_workflows(self, include_disabled: bool = False) -> Dict:
        """
        Retrieve existing automation workflows.
        
        Args:
            include_disabled: Whether to include disabled workflows in the results
            
        Returns:
            Dict: List of available workflows
        """
        self._ensure_authenticated()
        
        workflows_endpoint = '/core/getautomationworkflows'
        
        params = {}
        if include_disabled:
            params['include_disabled'] = 1
        
        try:
            # Make sure to use GET method
            response = self.session.get(
                f"{self.server_url}{workflows_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get automation workflows: {str(e)}"}
    
    def get_automation_workflow_details(self, workflow_id: str) -> Dict:
        """
        Retrieve details of a specific automation workflow.
        
        Args:
            workflow_id: ID of the workflow to retrieve
            
        Returns:
            Dict: Workflow details
        """
        self._ensure_authenticated()
        
        workflow_endpoint = '/core/getautomationworkflow'
        
        params = {
            'id': workflow_id
        }
        
        try:
            # Use GET method
            response = self.session.get(
                f"{self.server_url}{workflow_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get workflow details: {str(e)}"}
    
    def update_automation_workflow(self, workflow_id: str, workflow_data: Dict) -> Dict:
        """
        Update an existing automation workflow.
        
        Args:
            workflow_id: ID of the workflow to update
            workflow_data: Dictionary containing the updated workflow configuration
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        update_endpoint = '/core/updateautomationworkflow'
        
        # Prepare the workflow data for submission
        params = {
            'id': workflow_id,
            **self._prepare_workflow_data(workflow_data)
        }
        
        try:
            # Keep using POST for update operations
            response = self.session.post(
                f"{self.server_url}{update_endpoint}",
                data=params,  # Use data instead of params for POST
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to update workflow: {str(e)}"}
    
    def create_automation_workflow(self, workflow_data: Dict) -> Dict:
        """
        Create a new automation workflow.
        
        Args:
            workflow_data: Dictionary containing the workflow configuration
            
        Returns:
            Dict: Response from the server with the new workflow ID
        """
        self._ensure_authenticated()
        
        create_endpoint = '/core/createautomationworkflow'
        
        # Prepare the workflow data for submission
        params = self._prepare_workflow_data(workflow_data)
        
        try:
            # Use POST for create operations
            response = self.session.post(
                f"{self.server_url}{create_endpoint}",
                data=params,  # Use data for POST requests
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to create workflow: {str(e)}"}
    
    def delete_automation_workflow(self, workflow_id: str) -> Dict:
        """
        Delete an automation workflow.
        
        Args:
            workflow_id: ID of the workflow to delete
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        delete_endpoint = '/core/deleteautomationworkflow'
        
        params = {
            'id': workflow_id
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{delete_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to delete workflow: {str(e)}"}
    
    def start_automation_workflow(self, workflow_id: str, path: str, file_names: Optional[List[str]] = None) -> Dict:
        """
        Start an automation workflow on specific files.
        
        Args:
            workflow_id: ID of the workflow to start
            path: Path where the files are located
            file_names: List of file names to process (if None, processes all files in the path)
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        start_endpoint = '/core/startautomationworkflow'
        
        params = {
            'id': workflow_id,
            'path': path
        }
        
        # If specific files are provided, add them to the request
        if file_names and len(file_names) > 0:
            params['count'] = len(file_names)
            for i, name in enumerate(file_names, 1):
                params[f'fn{i}'] = name
        
        try:
            response = self.session.post(
                f"{self.server_url}{start_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to start workflow: {str(e)}"}
    
    def get_automation_workflow_runs(self, workflow_id: Optional[str] = None, 
                                    start: int = 0, limit: int = 100) -> Dict:
        """
        Get the history of automation workflow runs.
        
        Args:
            workflow_id: Optional ID to filter by specific workflow
            start: Starting index for pagination
            limit: Maximum number of results to return
            
        Returns:
            Dict: List of workflow runs
        """
        self._ensure_authenticated()
        
        runs_endpoint = '/core/getautomationworkflowruns'
        
        params = {
            'start': start,
            'limit': limit
        }
        
        if workflow_id:
            params['id'] = workflow_id
        
        try:
            # Use GET method
            response = self.session.get(
                f"{self.server_url}{runs_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get workflow runs: {str(e)}"}
    
    def get_automation_workflow_run_details(self, run_id: str) -> Dict:
        """
        Get details about a specific workflow run.
        
        Args:
            run_id: ID of the workflow run to get details for
            
        Returns:
            Dict: Detailed information about the workflow run
        """
        self._ensure_authenticated()
        
        details_endpoint = '/core/getautomationworkflowrundetails'
        
        params = {
            'id': run_id
        }
        
        try:
            # Use GET method
            response = self.session.get(
                f"{self.server_url}{details_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to get workflow run details: {str(e)}"}
    
    def _prepare_workflow_data(self, workflow_data: Dict) -> Dict:
        """
        Helper method to prepare workflow data for API submission.
        
        Args:
            workflow_data: Dictionary containing workflow configuration
            
        Returns:
            Dict: Formatted parameters for API request
        """
        params = {}
        
        # Map common workflow properties
        for key in ['name', 'description', 'enabled', 'trigger_type', 'path_filter', 'user_filter']:
            if key in workflow_data:
                params[key] = workflow_data[key]
        
        # Handle conditions (if present)
        if 'conditions' in workflow_data:
            conditions = workflow_data['conditions']
            params['conditions_total'] = len(conditions)
            
            for i, condition in enumerate(conditions, 1):  # Start index at 1
                for condition_key, value in condition.items():
                    params[f'condition{i}_{condition_key}'] = value
        
        # Handle actions (if present)
        if 'actions' in workflow_data:
            actions = workflow_data['actions']
            params['actions_total'] = len(actions)
            
            for i, action in enumerate(actions, 1):  # Start index at 1
                for action_key, value in action.items():
                    # Special handling for action parameters which may be nested
                    if action_key == 'parameters' and isinstance(value, dict):
                        for param_key, param_value in value.items():
                            # Special handling for attributes array
                            if param_key == 'attributes' and isinstance(param_value, list):
                                params[f'action{i}_parameter_{param_key}_total'] = len(param_value)
                                for j, attr in enumerate(param_value, 1):  # Start index at 1
                                    for attr_key, attr_val in attr.items():
                                        params[f'action{i}_parameter_{param_key}{j}_{attr_key}'] = attr_val
                            # Special handling for users array
                            elif param_key == 'users' and isinstance(param_value, list):
                                params[f'action{i}_parameter_{param_key}'] = ','.join(param_value)
                            else:
                                params[f'action{i}_parameter_{param_key}'] = param_value
                    else:
                        params[f'action{i}_{action_key}'] = value
        
        # Handle schedule (if present)
        if 'schedule' in workflow_data:
            schedule = workflow_data['schedule']
            for schedule_key, value in schedule.items():
                params[f'schedule_{schedule_key}'] = value
        
        return params

    def get_metadata_sets(self) -> Dict:
        """
        Get all available metadata sets.
        
        Returns:
            Dict: Raw response containing metadata sets information
        """
        self._ensure_authenticated_admin()
        
        try:
            endpoint = '/admin/getmetadatasetdefinitions'
            
            # Make API call directly using session instead of _api_request
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                data={'end': '100'},
                cookies=self.session.cookies
            )
            
            # Return the parsed response without additional processing
            result = self._parse_response(response)
            
            # Log successful retrieval
            if result.get('success'):
                logger.debug(f"Successfully retrieved metadata sets")
            else:
                logger.warning(f"Failed to retrieve metadata sets: {result.get('message')}")
                
            return result
            
        except Exception as e:
            logger.error(f"Error in get_metadata_sets: {str(e)}")
            return {
                "success": False, 
                "message": f"Failed to get metadata sets: {str(e)}"
            }

    def get_metadata_attributes(self, set_id: str) -> Dict:
        """
        Get attributes for a specific metadata set.
        
        Args:
            set_id: ID of the metadata set
            
        Returns:
            Dict: Raw response containing metadata attributes
        """
        self._ensure_authenticated_admin()

        endpoint = '/admin/getmetadataset'
        
        # Make direct API call
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                data={'setId': set_id},
                cookies=self.session.cookies
            )
            
            # Return parsed response without additional formatting
            result = self._parse_response(response)
            
            # Log result
            if result.get('success'):
                logger.debug(f"Successfully retrieved metadata attributes for set {set_id}")
            else:
                logger.warning(f"Failed to retrieve metadata attributes: {result.get('message')}")
                
            return result
            
        except Exception as e:
            logger.error(f"Error in get_metadata_attributes: {str(e)}")
            return {
                "success": False, 
                "message": f"Failed to get metadata attributes: {str(e)}"
            }

    def get_metadata_values(self, file_path: str) -> Dict:
        """
        Get metadata values for a specific file.
        
        Args:
            file_path: Path to the file
            
        Returns:
            Dict: Metadata values for the file with support for multiple sets
        """
        self._ensure_authenticated()
        
        metadata_endpoint = '/core/getmetadatavalues'
        
        params = {
            'fullpath': file_path
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{metadata_endpoint}",
                params=params,
                cookies=self.session.cookies
            )
            
            result = self._parse_response(response)
            
            # Additional logging for metadata values response
            if result.get('success'):
                logger.debug(f"Successfully retrieved metadata values for {file_path}")
            else:
                logger.warning(f"Failed to retrieve metadata values: {result.get('message', 'Unknown error')}")
                
            return result
        except Exception as e:
            logger.error(f"Error in get_metadata_values: {str(e)}")
            return {"success": False, "message": f"Failed to get metadata values: {str(e)}"}

    def save_attribute_values(self, file_path: str, set_id: str, attribute_values: Dict[str, str]) -> Dict:
        """
        Save metadata attribute values for a file or folder.
        
        Args:
            file_path: Full path to the file or folder
            set_id: ID of the metadata set
            attribute_values: Dictionary of attribute IDs and values
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        save_endpoint = '/core/saveattributevalues'
        
        params = {
            'fullpath': file_path,  # Use the correct parameter name
            'setid': set_id,
            'attributes_total': len(attribute_values)
        }
        
        # Add attribute values to params in the format expected by the API
        for i, (attr_id, value) in enumerate(attribute_values.items()):
            params[f'attribute{i}_attributeid'] = attr_id
            params[f'attribute{i}_value'] = value

        
        
        try:
            response = self.session.post(
                f"{self.server_url}{save_endpoint}",
                data=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to save attributes: {str(e)}"}

    def add_set_to_file_object(self, file_path: str, set_id: str) -> Dict:
        """
        Add a metadata set to a file or folder.
        
        Args:
            file_path: Full path to the file or folder
            set_id: ID of the metadata set to add
            
        Returns:
            Dict: Response from the server
        """
        self._ensure_authenticated()
        
        endpoint = '/core/addsettofileobject'
        
        params = {
            'fullpath': file_path,
            'setid': set_id
        }
        
        try:
            response = self.session.post(
                f"{self.server_url}{endpoint}",
                data=params,
                cookies=self.session.cookies
            )
            
            return self._parse_response(response)
        except Exception as e:
            return {"success": False, "message": f"Failed to add metadata set to file object: {str(e)}"}

    # ==========================================
    # Controlled Document System Extensions
    # ==========================================
    
    def create_document_folder(self, folder_path: str, create_parents: bool = True) -> Dict[str, Any]:
        """
        Create a folder for document storage.
        
        Args:
            folder_path: Folder path to create
            create_parents: Whether to create parent folders if they don't exist
            
        Returns:
            Response dictionary with status information
        """
        if create_parents:
            # Split path and create parent folders recursively
            parts = folder_path.strip('/').split('/')
            current_path = ""
            
            for part in parts:
                if part:
                    current_path += f"/{part}"
                    # Check if folder exists
                    folder_exists = self.check_folder_exists(current_path)
                    
                    if not folder_exists:
                        # Create folder
                        self.create_folder(current_path)
            
            return {"success": True, "path": folder_path}
        else:
            # Just create the specified folder
            return self.create_folder(folder_path)
    
    def upload_controlled_document(
        self, 
        file_data: Union[bytes, BinaryIO],
        folder_path: str, 
        filename: str,
        metadata: Dict[str, Any] = None,
        version_comment: str = None
    ) -> Dict[str, Any]:
        """
        Upload a controlled document with special handling.
        
        Args:
            file_data: File content as bytes or file-like object
            folder_path: Target folder path
            filename: Filename to use
            metadata: Document metadata to attach
            version_comment: Comment for version history
            
        Returns:
            Response dictionary with uploaded file information
        """
        # Ensure folder exists
        self.create_document_folder(folder_path)
        
        # Full path for file
        full_path = f"{folder_path}/{filename}"
        
        # Check if file already exists (for versioning)
        file_exists = self.check_file_exists(full_path)
        
        # Upload or update file
        if file_exists:
            result = self.update_file(full_path, file_data, version_comment)
        else:
            result = self.upload_file(folder_path, filename, file_data)
        
        # Add metadata if provided
        if metadata and result.get('success', False):
            self.set_file_metadata(full_path, metadata)
            
            # Add metadata to result
            result['metadata'] = metadata
        
        return result
    
    
    def get_document_with_metadata(self, file_path: str) -> Dict[str, Any]:
        """
        Get document content and metadata in a single call.
        
        Args:
            file_path: Path to the document
            
        Returns:
            Dictionary with file content and metadata
        """
        # Get file content
        content_result = self.download_file(file_path)
        
        # Get metadata
        metadata_result = self.get_file_metadata(file_path)
        
        # Get version history
        version_result = self.get_file_versions(file_path)
        
        # Combine results
        return {
            "success": content_result.get('success', False),
            "file_path": file_path,
            "content": content_result.get('content'),
            "metadata": metadata_result.get('metadata', {}),
            "versions": version_result.get('versions', []),
            "error": content_result.get('error') or metadata_result.get('error')
        }
    
    def set_file_metadata(self, file_path: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
        """
        Set custom metadata for a file.
        
        Args:
            file_path: Path to the file
            metadata: Dictionary of metadata key-value pairs
            
        Returns:
            Response dictionary
        """
        # Format metadata for FileCloud API
        formatted_metadata = self._format_metadata_for_api(metadata)
        
        # API endpoint
        endpoint = "core/setCustomMetadata"
        
        # Request data
        request_data = {
            "filepath": file_path,
            "custommetadata": formatted_metadata
        }
        
        # Make API call
        response = self._api_request(endpoint, params=request_data,method='POST')
        
        return {
            "success": self._is_success(response),
            "file_path": file_path,
            "metadata": metadata,
            "error": self._get_error_message(response)
        }
    
    def get_file_metadata(self, file_path: str) -> Dict[str, Any]:
        """
        Get custom metadata for a file.
        
        Args:
            file_path: Path to the file
            
        Returns:
            Response dictionary with metadata
        """
        # API endpoint
        endpoint = "core/getMetadata"
        
        # Request data
        request_data = {
            "filepath": file_path
        }
        
        # Make API call
        response = self._api_request(endpoint, params=request_data,method='POST')
        
        # Extract metadata from response
        metadata = {}
        if self._is_success(response) and 'metadata' in response:
            raw_metadata = response['metadata']
            # Parse custom metadata
            if 'custommetadata' in raw_metadata:
                metadata = self._parse_api_metadata(raw_metadata['custommetadata'])
        
        return {
            "success": self._is_success(response),
            "file_path": file_path,
            "metadata": metadata,
            "error": self._get_error_message(response)
        }
    
    def get_file_versions(self, file_path: str) -> Dict[str, Any]:
        """
        Get version history for a file.
        
        Args:
            file_path: Path to the file
            
        Returns:
            Response dictionary with version history
        """
        # API endpoint
        endpoint = "core/getVersions"
        
        # Request data
        request_data = {
            "filepath": file_path
        }
        
        # Make API call
        response = self._api_request(endpoint, params=request_data,method='POST')
        
        # Extract versions
        versions = []
        if self._is_success(response) and 'versioninfo' in response:
            versions = response['versioninfo']
        
        return {
            "success": self._is_success(response),
            "file_path": file_path,
            "versions": versions,
            "error": self._get_error_message(response)
        }
    
    def restore_file_version(self, file_path: str, version_id: str) -> Dict[str, Any]:
        """
        Restore a previous version of a file.
        
        Args:
            file_path: Path to the file
            version_id: Version ID to restore
            
        Returns:
            Response dictionary
        """
        # API endpoint
        endpoint = "core/restoreVersion"
        
        # Request data
        request_data = {
            "filepath": file_path,
            "versionid": version_id
        }
        
        # Make API call
        response = self._api_request(endpoint, params=request_data,method='POST')
        
        return {
            "success": self._is_success(response),
            "file_path": file_path,
            "version_id": version_id,
            "error": self._get_error_message(response)
        }
    
    def set_document_permissions(
        self,
        file_path: str,
        users: Dict[str, List[str]] = None,
        groups: Dict[str, List[str]] = None
    ) -> Dict[str, Any]:
        """
        Set document access permissions.
        
        Args:
            file_path: Path to the document
            users: Dictionary mapping usernames to permission lists
            groups: Dictionary mapping groups to permission lists
            
        Returns:
            Response dictionary
        """
        # Create permission structure expected by FileCloud
        permissions = []
        
        # Add user permissions
        if users:
            for username, perms in users.items():
                permissions.append({
                    "type": "user",
                    "name": username,
                    "permissions": perms
                })
        
        # Add group permissions
        if groups:
            for groupname, perms in groups.items():
                permissions.append({
                    "type": "group",
                    "name": groupname,
                    "permissions": perms
                })
        
        # API endpoint
        endpoint = "core/setPermissions"
        
        # Request data
        request_data = {
            "filepath": file_path,
            "permissions": json.dumps(permissions)
        }
        
        # Make API call
        response = self._api_request(endpoint, params=request_data,method='POST')
        
        return {
            "success": self._is_success(response),
            "file_path": file_path,
            "error": self._get_error_message(response)
        }
    
    def create_document_share(
        self,
        file_path: str,
        share_type: str = "view",
        password: Optional[str] = None,
        expiry_days: Optional[int] = None,
        notify_emails: Optional[List[str]] = None
    ) -> Dict[str, Any]:
        """
        Create a shareable link for a document.
        
        Args:
            file_path: Path to the document
            share_type: Type of share (view, download, upload, collaborate)
            password: Optional password protection
            expiry_days: Number of days until link expires
            notify_emails: List of emails to notify about the share
            
        Returns:
            Response dictionary with share link
        """
        # API endpoint
        endpoint = "core/createShare"
        
        # Request data
        request_data = {
            "filepath": file_path,
            "type": share_type
        }
        
        # Add optional parameters
        if password:
            request_data["password"] = password
        
        if expiry_days is not None:
            # Convert to seconds
            expiry_time = int(time.time()) + (expiry_days * 86400)
            request_data["expiry"] = expiry_time
        
        if notify_emails:
            request_data["emails"] = ",".join(notify_emails)
        
        # Make API call
        response = self._api_request(endpoint, params=request_data,method='POST')
        
        # Extract share URL
        share_url = None
        if self._is_success(response) and 'shareinfo' in response:
            share_url = response['shareinfo'].get('url')
        
        return {
            "success": self._is_success(response),
            "file_path": file_path,
            "share_url": share_url,
            "share_id": response.get('shareinfo', {}).get('id'),
            "error": self._get_error_message(response)
        }
    
    def search_documents(
        self,
        search_text: str,
        folder_path: Optional[str] = None,
        metadata: Optional[Dict[str, str]] = None,
        doc_type: Optional[str] = None,
        max_results: int = 100
    ) -> Dict[str, Any]:
        """
        Search for documents using text and metadata.
        
        Args:
            search_text: Text to search for
            folder_path: Optional folder path to limit search
            metadata: Optional metadata criteria as key-value pairs
            doc_type: Optional document type to filter by
            max_results: Maximum number of results to return
            
        Returns:
            Response dictionary with search results
        """
        # API endpoint
        endpoint = "core/searchFiles"
        
        # Build query
        query = search_text
        
        # Add metadata constraints if provided
        if metadata:
            for key, value in metadata.items():
                query += f" metadata:{key}:{value}"
        
        # Add document type constraint if provided
        if doc_type:
            query += f" metadata:doc_type:{doc_type}"
        
        # Request data
        request_data = {
            "searchtext": query,
            "maxhits": max_results
        }
        
        # Add folder path constraint if provided
        if folder_path:
            request_data["folderpath"] = folder_path
        
        # Make API call
        response = self._api_request(endpoint, params=request_data,method='POST')
        
        # Extract search results
        results = []
        if self._is_success(response) and 'searchresults' in response:
            results = response['searchresults']
        
        return {
            "success": self._is_success(response),
            "results": results,
            "count": len(results),
            "error": self._get_error_message(response)
        }
    
    def start_document_workflow(
        self,
        file_path: str,
        workflow_name: str,
        workflow_data: Dict[str, Any]
    ) -> Dict[str, Any]:
        """
        Start a workflow for a document.
        
        Args:
            file_path: Path to the document
            workflow_name: Name of the workflow to start
            workflow_data: Initial workflow data
            
        Returns:
            Response dictionary with workflow information
        """
        # API endpoint
        endpoint = "workflow/startWorkflow"
        
        # Request data
        request_data = {
            "filepath": file_path,
            "workflowname": workflow_name,
            "workflowdata": json.dumps(workflow_data)
        }
        
        # Make API call
        response = self._api_request(endpoint, params=request_data,method='POST')
        
        return {
            "success": self._is_success(response),
            "file_path": file_path,
            "workflow_id": response.get('workflowid'),
            "error": self._get_error_message(response)
        }
    
    def check_folder_exists(self, folder_path: str) -> bool:
        """
        Check if a folder exists.
        
        Args:
            folder_path: Path to check
            
        Returns:
            True if folder exists, False otherwise
        """
        try:
            # Ensure client is authenticated
            self._ensure_authenticated()
            
            # API endpoint
            endpoint = '/core/fileexists'
            url = f"{self.server_url}{endpoint}"
            
            # Make request directly with data parameter
            response = self.session.post(
                url,
                data={"file": folder_path},
                cookies=self.session.cookies
            )
            
            logger.debug(f"Response status: {response.status_code}")
            logger.debug(f"Response text: {response.text[:500]}")
            
            # Parse the response
            if response.status_code == 200:
                # Check for known success patterns
                if response.text.strip() == "1":
                    return True
                elif response.text.strip() == "0":
                    return False
                
                # Try parsing as JSON if possible
                try:
                    data = response.json()
                    if isinstance(data, dict):
                        return data.get('success', False)
                except:
                    pass
                    
                # Try parsing XML response
                if '<result>1</result>' in response.text:
                    return True
                elif '<result>0</result>' in response.text:
                    return False
                
                # Check for success/error message patterns
                if 'success' in response.text.lower():
                    return True
                elif 'not found' in response.text.lower() or 'error' in response.text.lower():
                    return False
            
            # Default to false for any other case
            return False
            
        except Exception as e:
            logger.error(f"Error in check_folder_exists: {e}")
            return False
    
    def check_file_exists(self, file_path: str) -> bool:
        """
        Check if a file exists.
        
        Args:
            file_path: Path to check
            
        Returns:
            True if file exists, False otherwise
        """
        try:
        # Ensure client is authenticated
            self._ensure_authenticated()
            
            # API endpoint
            endpoint = '/core/fileexists'
            url = f"{self.server_url}{endpoint}"
            
            # Make request directly with data parameter
            response = self.session.post(
                url,
                data={"file": file_path},
                cookies=self.session.cookies
            )
            
            logger.debug(f"Response status: {response.status_code}")
            logger.debug(f"Response text: {response.text[:500]}")
            
            # Parse the response
            if response.status_code == 200:
                # Check for known success patterns
                if response.text.strip() == "1":
                    return True
                elif response.text.strip() == "0":
                    return False
                
                # Try parsing as JSON if possible
                try:
                    data = response.json()
                    if isinstance(data, dict):
                        return data.get('success', False)
                except:
                    pass
                    
                # Try parsing XML response
                if '<result>1</result>' in response.text:
                    return True
                elif '<result>0</result>' in response.text:
                    return False
                
                # Check for success/error message patterns
                if 'success' in response.text.lower():
                    return True
                elif 'not found' in response.text.lower() or 'error' in response.text.lower():
                    return False
            
            # Default to false for any other case
            return False
        
        except Exception as e:
            logger.error(f"Error in check_file_exists: {e}")
            return False

    # ==========================================
    # Helper Methods
    # ==========================================
    
    def _format_metadata_for_api(self, metadata: Dict[str, Any]) -> str:
        """
        Format metadata dictionary for FileCloud API.
        
        Args:
            metadata: Dictionary of metadata
            
        Returns:
            Formatted metadata string
        """
        formatted = []
        
        for key, value in metadata.items():
            # Convert non-string values to JSON strings
            if not isinstance(value, str):
                value = json.dumps(value)
            
            formatted.append(f"{key}={value}")
        
        return "|".join(formatted)
    
    def _parse_api_metadata(self, metadata_str: str) -> Dict[str, Any]:
        """
        Parse metadata string from FileCloud API.
        
        Args:
            metadata_str: Metadata string from API
            
        Returns:
            Dictionary of parsed metadata
        """
        metadata = {}
        
        if not metadata_str:
            return metadata
            
        # Split by the separator
        items = metadata_str.split("|")
        
        for item in items:
            if "=" in item:
                key, value = item.split("=", 1)
                
                # Try to parse JSON values
                try:
                    parsed_value = json.loads(value)
                    metadata[key] = parsed_value
                except (json.JSONDecodeError, TypeError):
                    # Keep as string if not valid JSON
                    metadata[key] = value
        
        return metadata

    def _api_request(self, endpoint: str, data: dict = None, params: dict = None, files: dict = None, method: str = "POST") -> dict:
        """
        Make a request to the FileCloud API.
        
        Args:
            endpoint: API endpoint to call (without server URL)
            data: Dictionary of form data to send
            params: Dictionary of URL parameters
            files: Dictionary of files to upload
            method: HTTP method (POST or GET)
        
        Returns:
            Dictionary with parsed response
        """
        self._ensure_authenticated()
        
        # Prepare URL
        url = f"{self.server_url}{endpoint}"
        
        try:
            # Make the request based on method
            if method.upper() == "GET":
                response = self.session.get(
                    url,
                    params=params,
                    cookies=self.session.cookies
                )
            else:  # Default to POST
                response = self.session.post(
                    url,
                    data=data,
                    params=params,
                    files=files,
                    cookies=self.session.cookies
                )
            
            # Parse and return the response
            print(f"API request to {url} returned status code {response.status_code}")
            return self._parse_response(response)
        except Exception as e:
            return {
                "success": False,
                "message": f"API request failed: {str(e)}"
            }

Parameters

Name Type Default Kind
bases - -

Parameter Details

bases: Parameter of type

Return Value

Returns unspecified type

Class Interface

Methods

__init__(self, server_url, username, password)

Purpose: Initialize the FileCloud API client. Args: server_url: The URL of the FileCloud server (e.g., 'https://filecloud.example.com/') username: Username for authentication (optional if authenticating later) password: Password for authentication (optional if authenticating later)

Parameters:

  • server_url: Type: str
  • username: Type: str
  • password: Type: str

Returns: None

login(self, username, password) -> bool

Purpose: Authenticate with the FileCloud server. Args: username: Override the username provided during initialization password: Override the password provided during initialization Returns: bool: True if authentication is successful, False otherwise.

Parameters:

  • username: Type: str
  • password: Type: str

Returns: Returns bool

adminlogin(self, username, password) -> bool

Purpose: Authenticate with the FileCloud server. Args: username: Override the username provided during initialization password: Override the password provided during initialization Returns: bool: True if authentication is successful, False otherwise.

Parameters:

  • username: Type: str
  • password: Type: str

Returns: Returns bool

authenticate(self, username, password) -> bool

Purpose: Alias for login() for backward compatibility

Parameters:

  • username: Type: str
  • password: Type: str

Returns: Returns bool

logout(self) -> Dict

Purpose: Logout from the FileCloud server. Returns: Dict: Response from the server.

Returns: Returns Dict

_ensure_authenticated(self)

Purpose: Ensure that the client is authenticated with the server. Attempts to login if not already authenticated. Raises: Exception: If authentication fails.

Returns: None

_ensure_authenticated_admin(self)

Purpose: Ensure that the client is authenticated with the server. Attempts to login if not already authenticated. Raises: Exception: If authentication fails.

Returns: None

_parse_response(self, response) -> Dict

Purpose: Parse response based on content type. Args: response: The HTTP response object. Returns: Dict: Parsed response as a dictionary.

Parameters:

  • response: Type: requests.Response

Returns: Returns Dict

_xml_to_dict(self, element)

Purpose: Convert an XML element to a dictionary. Args: element: XML element to convert Returns: Dict or str: Converted element

Parameters:

  • element: Parameter

Returns: See docstring for return details

_extract_paths_from_xml(self, xml_content) -> List[str]

Purpose: Extract file paths from XML search results. Args: xml_content: XML content from search results Returns: List[str]: List of file paths

Parameters:

  • xml_content: Type: str

Returns: Returns List[str]

upload_file(self, local_file_path, remote_path, filename, overwrite) -> Dict

Purpose: Upload a file to the FileCloud server. Args: local_file_path: Path to the local file to upload remote_path: Directory path on the server where to upload the file filename: Optional name to use for the file on the server (default: original filename) overwrite: Whether to overwrite existing files with the same name Returns: Dict: Response from the server

Parameters:

  • local_file_path: Type: str
  • remote_path: Type: str
  • filename: Type: Optional[str]
  • overwrite: Type: bool

Returns: Returns Dict

download_file(self, remote_file_path, local_file_path, check_only) -> Union[Dict, bytes]

Purpose: Download a file from the FileCloud server. Args: remote_file_path: Full path to the file on the server local_file_path: Path where to save the downloaded file (if None, returns content) check_only: If True, only checks if the file is downloadable without downloading Returns: Union[Dict, bytes]: Either a response dict or the file content as bytes

Parameters:

  • remote_file_path: Type: str
  • local_file_path: Type: Optional[str]
  • check_only: Type: bool

Returns: Returns Union[Dict, bytes]

get_file_list(self, path, sort_by, sort_dir, start, limit, include_metadata) -> Dict

Purpose: Get a list of files and directories in the specified path. Args: path: Path to list files from sort_by: Field to sort by ("name", "date", or "size") sort_dir: Sort direction (1 for ascending, -1 for descending) start: Index to start from (for pagination) limit: Maximum number of entries to return (-1 for unlimited) include_metadata: Whether to include metadata information Returns: Dict: Response containing the file list

Parameters:

  • path: Type: str
  • sort_by: Type: str
  • sort_dir: Type: int
  • start: Type: int
  • limit: Type: int
  • include_metadata: Type: bool

Returns: Returns Dict

create_folder(self, path, folder_name, subpath) -> Dict

Purpose: Create a new folder on the server. Args: path: Path where to create the folder folder_name: Name of the folder to create subpath: Alternative to folder_name, creates all missing folders in this path Returns: Dict: Response from the server

Parameters:

  • path: Type: str
  • folder_name: Type: Optional[str]
  • subpath: Type: Optional[str]

Returns: Returns Dict

delete_file(self, path, name) -> Dict

Purpose: Delete a file or folder. Args: path: Path where the file/folder is located name: Name of the file/folder to delete Returns: Dict: Response from the server

Parameters:

  • path: Type: str
  • name: Type: str

Returns: Returns Dict

rename_or_move(self, from_path, to_path, overwrite) -> Dict

Purpose: Rename or move a file or folder. Args: from_path: Full path to the source file/folder to_path: Full path to the destination overwrite: Whether to overwrite if destination exists Returns: Dict: Response from the server

Parameters:

  • from_path: Type: str
  • to_path: Type: str
  • overwrite: Type: bool

Returns: Returns Dict

copy_file(self, path, name, copy_to, overwrite) -> Dict

Purpose: Copy a file to a new location. Args: path: Path where the file is located name: Name of the file to copy copy_to: Full path to the destination overwrite: Whether to overwrite if destination exists Returns: Dict: Response from the server

Parameters:

  • path: Type: str
  • name: Type: str
  • copy_to: Type: str
  • overwrite: Type: bool

Returns: Returns Dict

search(self, search_string, search_scope, search_location, min_size, max_size, start, limit) -> Dict

Purpose: Search for files and folders. Args: search_string: String to search for search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) search_location: Path to search within (None for all accessible locations) min_size: Minimum file size in KB max_size: Maximum file size in KB start: Start index for pagination limit: Maximum number of results to return Returns: Dict: Search results

Parameters:

  • search_string: Type: str
  • search_scope: Type: str
  • search_location: Type: Optional[str]
  • min_size: Type: Optional[int]
  • max_size: Type: Optional[int]
  • start: Type: int
  • limit: Type: str

Returns: Returns Dict

search_metadata(self, search_string, search_scope, attributes, search_location) -> Dict

Purpose: Search for files based on metadata attributes. Args: search_string: String to search for (use '**' for all files) search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) attributes: List of attribute dictionaries, each containing one of these formats: - Format 1: {'id': 'attr_id', 'value': 'attr_value', 'type': type_int} - Format 2: {'setid': 'set_id', 'attributeid': 'attr_id', 'operator': 'equals', 'value': 'attr_value'} search_location: Optional path to limit the search to Returns: Dict: Search results with paths Example: # Search for files with multiple metadata criteria client.search_metadata( attributes=[ {"id": "abc123", "value": "Important", "type": 1}, {"id": "def456", "value": "2023", "type": 2} ], search_location="/user/documents" )

Parameters:

  • search_string: Type: str
  • search_scope: Type: str
  • attributes: Type: Optional[List[Dict[str, Any]]]
  • search_location: Type: Optional[str]

Returns: Returns Dict

search_metadata_single(self, search_string, search_scope, attribute_id, attribute_type, attribute_value) -> Dict

Purpose: Search for files based on a single metadata attribute. This method is kept for backward compatibility. The search_metadata method is preferred. Args: search_string: String to search for (use '**' for all files) search_scope: Search scope (0=All, 1=Paths, 2=Content, 3=Metadata) attribute_id: Metadata attribute ID to search for attribute_type: Metadata attribute type (1=Text, 2=Integer, etc.) attribute_value: Metadata attribute value to match Returns: Dict: Search results with paths

Parameters:

  • search_string: Type: str
  • search_scope: Type: str
  • attribute_id: Type: Optional[str]
  • attribute_type: Type: Optional[int]
  • attribute_value: Type: Optional[str]

Returns: Returns Dict

get_metadata_values(self, file_path) -> Dict

Purpose: Get metadata values for a specific file. Args: file_path: Path to the file Returns: Dict: Metadata values for the file with support for multiple sets

Parameters:

  • file_path: Type: str

Returns: Returns Dict

save_attribute_values(self, file_path, set_id, attribute_values) -> Dict

Purpose: Save metadata attribute values for a file or folder. Args: file_path: Full path to the file or folder set_id: ID of the metadata set attribute_values: Dictionary of attribute IDs and values Returns: Dict: Response from the server

Parameters:

  • file_path: Type: str
  • set_id: Type: str
  • attribute_values: Type: Dict[str, str]

Returns: Returns Dict

get_file_info(self, file_path) -> Dict

Purpose: Get information about a file. Args: file_path: Full path to the file Returns: Dict: File information

Parameters:

  • file_path: Type: str

Returns: Returns Dict

create_directory_tree(self, base_path, target_path) -> Dict

Purpose: Create a directory tree, creating all necessary parent directories. Args: base_path: Base path where to create the directories target_path: Path structure to create (relative to base_path) Returns: Dict: Response from the server

Parameters:

  • base_path: Type: str
  • target_path: Type: str

Returns: Returns Dict

add_acl_entry(self, path, entry_type, value, permissions, flag) -> Dict

Purpose: Add an Access Control List (ACL) entry to a file or folder. Args: path: Path to the file or folder entry_type: Type of entry ('user' or 'group') value: Username or group name permissions: Permission string (e.g., 'R', 'W', 'D', 'S') flag: Optional flag parameter - 'allow', 'deny' Returns: Dict: Response from the server

Parameters:

  • path: Type: str
  • entry_type: Type: str
  • value: Type: str
  • permissions: Type: str
  • flag: Type: Optional[str]

Returns: Returns Dict

delete_acl_entry(self, path, entry_type, value) -> Dict

Purpose: Delete an Access Control List (ACL) entry from a file or folder. Args: path: Path to the file or folder entry_type: Type of entry ('user' or 'group') value: Username or group name Returns: Dict: Response from the server

Parameters:

  • path: Type: str
  • entry_type: Type: str
  • value: Type: str

Returns: Returns Dict

get_acl(self, path, filter_term, list_inherited) -> Dict

Purpose: Get Access Control List (ACL) for a specific path. Args: path: Path to the file or folder filter_term: Optional filter to apply to the results list_inherited: Whether to include inherited permissions Returns: Dict: ACL information

Parameters:

  • path: Type: str
  • filter_term: Type: Optional[str]
  • list_inherited: Type: bool

Returns: Returns Dict

get_effective_acl(self, path, emailid) -> Dict

Purpose: Get effective ACL permissions for a user on a specific path. Used to check for effective permissions for a user for a particular shared path such as read, write, delete, share or manage permissions. Args: path: Path to the file or folder emailid: Optional username (emailid) to check permissions for (if None, checks for current user) Returns: Dict: Effective ACL information showing what permissions the user has

Parameters:

  • path: Type: str
  • emailid: Type: Optional[str]

Returns: Returns Dict

get_acls(self, filter_text, start, limit) -> Dict

Purpose: Get all Access Control Lists (ACLs) in the system. Args: filter_text: Optional text to filter results start: Start index for pagination limit: Maximum number of results to return Returns: Dict: List of ACLs

Parameters:

  • filter_text: Type: Optional[str]
  • start: Type: int
  • limit: Type: int

Returns: Returns Dict

get_all_acls_for_path(self, path) -> Dict

Purpose: Get all Access Control Lists (ACLs) for a specific path. Args: path: Path to the file or folder Returns: Dict: All ACL information for the path

Parameters:

  • path: Type: str

Returns: Returns Dict

add_user_to_share(self, user_id, share_id) -> Dict

Purpose: Add a user to a private share. Args: user_id: ID or username of the user to add share_id: ID of the share Returns: Dict: Response from the server

Parameters:

  • user_id: Type: str
  • share_id: Type: str

Returns: Returns Dict

add_users_to_share(self, share_id, users, send_email, custom_subject, custom_message, send_copy) -> Dict

Purpose: Add multiple users to a share. Args: share_id: ID of the share users: List of usernames or email addresses to add to the share send_email: Whether to send notification emails to the users custom_subject: Custom subject for the notification email custom_message: Custom message for the notification email send_copy: Whether to send a copy of the notification to the sharer Returns: Dict: Response from the server

Parameters:

  • share_id: Type: str
  • users: Type: List[str]
  • send_email: Type: bool
  • custom_subject: Type: Optional[str]
  • custom_message: Type: Optional[str]
  • send_copy: Type: bool

Returns: Returns Dict

add_group_to_share(self, group_id, share_id) -> Dict

Purpose: Add a group to a private share. Args: group_id: ID of the group to add share_id: ID of the share Returns: Dict: Response from the server

Parameters:

  • group_id: Type: str
  • share_id: Type: str

Returns: Returns Dict

delete_user_from_share(self, user_id, share_id) -> Dict

Purpose: Remove a user from a private share. Args: user_id: ID or username of the user to remove share_id: ID of the share Returns: Dict: Response from the server

Parameters:

  • user_id: Type: str
  • share_id: Type: str

Returns: Returns Dict

delete_group_from_share(self, group_id, share_id) -> Dict

Purpose: Remove a group from a private share. Args: group_id: ID of the group to remove share_id: ID of the share Returns: Dict: Response from the server

Parameters:

  • group_id: Type: str
  • share_id: Type: str

Returns: Returns Dict

get_share_for_path(self, path, share_id) -> Dict

Purpose: Get information about a share for a specific path. Args: path: Path to the shared file or folder share_id: Optional share ID to filter by Returns: Dict: Share information

Parameters:

  • path: Type: str
  • share_id: Type: Optional[str]

Returns: Returns Dict

get_share_for_id(self, share_id) -> Dict

Purpose: Get information about a share using its ID. Args: share_id: ID of the share Returns: Dict: Share information

Parameters:

  • share_id: Type: str

Returns: Returns Dict

update_share(self, share_id) -> Dict

Purpose: Update share information and settings. Args: share_id: ID of the share to update **kwargs: Parameters to update, which can include: - sharename: Name of the share - sharelocation: Location of the shared file/folder - validityperiod: Validity period in days - expirytimestamp: Expiry timestamp - sharesizelimit: Share size limit - maxdownloads: Maximum number of downloads - hidenotification: Hide notification (1 or 0) - sharepassword: Password for protected share - allowpublicaccess: Allow public access (1 or 0) - allowpublicupload: Allow public upload (1 or 0) - allowpublicviewonly: Allow public view only (1 or 0) - allowpublicuploadonly: Allow public upload only (1 or 0) - newshareowner: New owner of the share - defaultfile: Default file to open when accessing the share Returns: Dict: Response from the server

Parameters:

  • share_id: Type: str

Returns: Returns Dict

add_share(self, share_location, share_name, share_size_limit, max_downloads, validity_period, expiry_timestamp) -> Dict

Purpose: Create a new share. Args: share_location: Path to the file/folder to share share_name: Name for the share share_size_limit: Size limit for the share in bytes max_downloads: Maximum number of downloads allowed validity_period: Validity period in days expiry_timestamp: Expiry timestamp Returns: Dict: Response from the server with the new share ID

Parameters:

  • share_location: Type: str
  • share_name: Type: str
  • share_size_limit: Type: Optional[int]
  • max_downloads: Type: Optional[int]
  • validity_period: Type: Optional[int]
  • expiry_timestamp: Type: Optional[int]

Returns: Returns Dict

update_share_link(self, share_id, old_share_link, new_share_link, notify_users) -> Dict

Purpose: Update the URL of a share. Args: share_id: ID of the share old_share_link: Current share link new_share_link: New share link to set notify_users: Whether to notify users about the link change Returns: Dict: Response from the server

Parameters:

  • share_id: Type: str
  • old_share_link: Type: str
  • new_share_link: Type: str
  • notify_users: Type: bool

Returns: Returns Dict

get_shared_link(self, share_location) -> Dict

Purpose: Get a share link for a specific location. Args: share_location: Path to the shared file or folder Returns: Dict: Share link information

Parameters:

  • share_location: Type: str

Returns: Returns Dict

set_user_access_for_share(self, share_id, user_id, download, write, share, sync, allow_manage, disallow_delete) -> Dict

Purpose: Set permissions for a user on a shared file/folder. Args: share_id: ID of the share user_id: ID or username of the user download: Whether the user can download write: Whether the user can write/edit share: Whether the user can share sync: Whether the user can sync allow_manage: Whether the user can manage the share disallow_delete: Whether to disallow deletion Returns: Dict: Response from the server

Parameters:

  • share_id: Type: str
  • user_id: Type: str
  • download: Type: Optional[bool]
  • write: Type: Optional[bool]
  • share: Type: Optional[bool]
  • sync: Type: Optional[bool]
  • allow_manage: Type: Optional[bool]
  • disallow_delete: Type: Optional[bool]

Returns: Returns Dict

set_group_access_for_share(self, share_id, group_id, download, write, share, sync, allow_manage, disallow_delete) -> Dict

Purpose: Set permissions for a group on a shared file/folder. Args: share_id: ID of the share group_id: ID of the group download: Whether the group can download write: Whether the group can write/edit share: Whether the group can share sync: Whether the group can sync allow_manage: Whether the group can manage the share disallow_delete: Whether to disallow deletion Returns: Dict: Response from the server

Parameters:

  • share_id: Type: str
  • group_id: Type: str
  • download: Type: Optional[bool]
  • write: Type: Optional[bool]
  • share: Type: Optional[bool]
  • sync: Type: Optional[bool]
  • allow_manage: Type: Optional[bool]
  • disallow_delete: Type: Optional[bool]

Returns: Returns Dict

delete_share(self, share_id) -> Dict

Purpose: Delete a share. Args: share_id: ID of the share to delete Returns: Dict: Response from the server

Parameters:

  • share_id: Type: str

Returns: Returns Dict

get_users_for_share(self, share_id) -> Dict

Purpose: Get a list of users that are added explicitly to a share. Args: share_id: ID of the share Returns: Dict: Response containing the list of users with their access permissions

Parameters:

  • share_id: Type: str

Returns: Returns Dict

get_automation_workflows(self, include_disabled) -> Dict

Purpose: Retrieve existing automation workflows. Args: include_disabled: Whether to include disabled workflows in the results Returns: Dict: List of available workflows

Parameters:

  • include_disabled: Type: bool

Returns: Returns Dict

get_automation_workflow_details(self, workflow_id) -> Dict

Purpose: Retrieve details of a specific automation workflow. Args: workflow_id: ID of the workflow to retrieve Returns: Dict: Workflow details

Parameters:

  • workflow_id: Type: str

Returns: Returns Dict

update_automation_workflow(self, workflow_id, workflow_data) -> Dict

Purpose: Update an existing automation workflow. Args: workflow_id: ID of the workflow to update workflow_data: Dictionary containing the updated workflow configuration Returns: Dict: Response from the server

Parameters:

  • workflow_id: Type: str
  • workflow_data: Type: Dict

Returns: Returns Dict

create_automation_workflow(self, workflow_data) -> Dict

Purpose: Create a new automation workflow. Args: workflow_data: Dictionary containing the workflow configuration Returns: Dict: Response from the server with the new workflow ID

Parameters:

  • workflow_data: Type: Dict

Returns: Returns Dict

delete_automation_workflow(self, workflow_id) -> Dict

Purpose: Delete an automation workflow. Args: workflow_id: ID of the workflow to delete Returns: Dict: Response from the server

Parameters:

  • workflow_id: Type: str

Returns: Returns Dict

start_automation_workflow(self, workflow_id, path, file_names) -> Dict

Purpose: Start an automation workflow on specific files. Args: workflow_id: ID of the workflow to start path: Path where the files are located file_names: List of file names to process (if None, processes all files in the path) Returns: Dict: Response from the server

Parameters:

  • workflow_id: Type: str
  • path: Type: str
  • file_names: Type: Optional[List[str]]

Returns: Returns Dict

get_automation_workflow_runs(self, workflow_id, start, limit) -> Dict

Purpose: Get the history of automation workflow runs. Args: workflow_id: Optional ID to filter by specific workflow start: Starting index for pagination limit: Maximum number of results to return Returns: Dict: List of workflow runs

Parameters:

  • workflow_id: Type: Optional[str]
  • start: Type: int
  • limit: Type: int

Returns: Returns Dict

get_automation_workflow_run_details(self, run_id) -> Dict

Purpose: Get details about a specific workflow run. Args: run_id: ID of the workflow run to get details for Returns: Dict: Detailed information about the workflow run

Parameters:

  • run_id: Type: str

Returns: Returns Dict

_prepare_workflow_data(self, workflow_data) -> Dict

Purpose: Helper method to prepare workflow data for API submission. Args: workflow_data: Dictionary containing workflow configuration Returns: Dict: Formatted parameters for API request

Parameters:

  • workflow_data: Type: Dict

Returns: Returns Dict

get_metadata_sets(self) -> Dict

Purpose: Get all available metadata sets. Returns: Dict: Raw response containing metadata sets information

Returns: Returns Dict

get_metadata_attributes(self, set_id) -> Dict

Purpose: Get attributes for a specific metadata set. Args: set_id: ID of the metadata set Returns: Dict: Raw response containing metadata attributes

Parameters:

  • set_id: Type: str

Returns: Returns Dict

get_metadata_values(self, file_path) -> Dict

Purpose: Get metadata values for a specific file. Args: file_path: Path to the file Returns: Dict: Metadata values for the file with support for multiple sets

Parameters:

  • file_path: Type: str

Returns: Returns Dict

save_attribute_values(self, file_path, set_id, attribute_values) -> Dict

Purpose: Save metadata attribute values for a file or folder. Args: file_path: Full path to the file or folder set_id: ID of the metadata set attribute_values: Dictionary of attribute IDs and values Returns: Dict: Response from the server

Parameters:

  • file_path: Type: str
  • set_id: Type: str
  • attribute_values: Type: Dict[str, str]

Returns: Returns Dict

add_set_to_file_object(self, file_path, set_id) -> Dict

Purpose: Add a metadata set to a file or folder. Args: file_path: Full path to the file or folder set_id: ID of the metadata set to add Returns: Dict: Response from the server

Parameters:

  • file_path: Type: str
  • set_id: Type: str

Returns: Returns Dict

create_document_folder(self, folder_path, create_parents) -> Dict[str, Any]

Purpose: Create a folder for document storage. Args: folder_path: Folder path to create create_parents: Whether to create parent folders if they don't exist Returns: Response dictionary with status information

Parameters:

  • folder_path: Type: str
  • create_parents: Type: bool

Returns: Returns Dict[str, Any]

upload_controlled_document(self, file_data, folder_path, filename, metadata, version_comment) -> Dict[str, Any]

Purpose: Upload a controlled document with special handling. Args: file_data: File content as bytes or file-like object folder_path: Target folder path filename: Filename to use metadata: Document metadata to attach version_comment: Comment for version history Returns: Response dictionary with uploaded file information

Parameters:

  • file_data: Type: Union[bytes, BinaryIO]
  • folder_path: Type: str
  • filename: Type: str
  • metadata: Type: Dict[str, Any]
  • version_comment: Type: str

Returns: Returns Dict[str, Any]

get_document_with_metadata(self, file_path) -> Dict[str, Any]

Purpose: Get document content and metadata in a single call. Args: file_path: Path to the document Returns: Dictionary with file content and metadata

Parameters:

  • file_path: Type: str

Returns: Returns Dict[str, Any]

set_file_metadata(self, file_path, metadata) -> Dict[str, Any]

Purpose: Set custom metadata for a file. Args: file_path: Path to the file metadata: Dictionary of metadata key-value pairs Returns: Response dictionary

Parameters:

  • file_path: Type: str
  • metadata: Type: Dict[str, Any]

Returns: Returns Dict[str, Any]

get_file_metadata(self, file_path) -> Dict[str, Any]

Purpose: Get custom metadata for a file. Args: file_path: Path to the file Returns: Response dictionary with metadata

Parameters:

  • file_path: Type: str

Returns: Returns Dict[str, Any]

get_file_versions(self, file_path) -> Dict[str, Any]

Purpose: Get version history for a file. Args: file_path: Path to the file Returns: Response dictionary with version history

Parameters:

  • file_path: Type: str

Returns: Returns Dict[str, Any]

restore_file_version(self, file_path, version_id) -> Dict[str, Any]

Purpose: Restore a previous version of a file. Args: file_path: Path to the file version_id: Version ID to restore Returns: Response dictionary

Parameters:

  • file_path: Type: str
  • version_id: Type: str

Returns: Returns Dict[str, Any]

set_document_permissions(self, file_path, users, groups) -> Dict[str, Any]

Purpose: Set document access permissions. Args: file_path: Path to the document users: Dictionary mapping usernames to permission lists groups: Dictionary mapping groups to permission lists Returns: Response dictionary

Parameters:

  • file_path: Type: str
  • users: Type: Dict[str, List[str]]
  • groups: Type: Dict[str, List[str]]

Returns: Returns Dict[str, Any]

create_document_share(self, file_path, share_type, password, expiry_days, notify_emails) -> Dict[str, Any]

Purpose: Create a shareable link for a document. Args: file_path: Path to the document share_type: Type of share (view, download, upload, collaborate) password: Optional password protection expiry_days: Number of days until link expires notify_emails: List of emails to notify about the share Returns: Response dictionary with share link

Parameters:

  • file_path: Type: str
  • share_type: Type: str
  • password: Type: Optional[str]
  • expiry_days: Type: Optional[int]
  • notify_emails: Type: Optional[List[str]]

Returns: Returns Dict[str, Any]

search_documents(self, search_text, folder_path, metadata, doc_type, max_results) -> Dict[str, Any]

Purpose: Search for documents using text and metadata. Args: search_text: Text to search for folder_path: Optional folder path to limit search metadata: Optional metadata criteria as key-value pairs doc_type: Optional document type to filter by max_results: Maximum number of results to return Returns: Response dictionary with search results

Parameters:

  • search_text: Type: str
  • folder_path: Type: Optional[str]
  • metadata: Type: Optional[Dict[str, str]]
  • doc_type: Type: Optional[str]
  • max_results: Type: int

Returns: Returns Dict[str, Any]

start_document_workflow(self, file_path, workflow_name, workflow_data) -> Dict[str, Any]

Purpose: Start a workflow for a document. Args: file_path: Path to the document workflow_name: Name of the workflow to start workflow_data: Initial workflow data Returns: Response dictionary with workflow information

Parameters:

  • file_path: Type: str
  • workflow_name: Type: str
  • workflow_data: Type: Dict[str, Any]

Returns: Returns Dict[str, Any]

check_folder_exists(self, folder_path) -> bool

Purpose: Check if a folder exists. Args: folder_path: Path to check Returns: True if folder exists, False otherwise

Parameters:

  • folder_path: Type: str

Returns: Returns bool

check_file_exists(self, file_path) -> bool

Purpose: Check if a file exists. Args: file_path: Path to check Returns: True if file exists, False otherwise

Parameters:

  • file_path: Type: str

Returns: Returns bool

_format_metadata_for_api(self, metadata) -> str

Purpose: Format metadata dictionary for FileCloud API. Args: metadata: Dictionary of metadata Returns: Formatted metadata string

Parameters:

  • metadata: Type: Dict[str, Any]

Returns: Returns str

_parse_api_metadata(self, metadata_str) -> Dict[str, Any]

Purpose: Parse metadata string from FileCloud API. Args: metadata_str: Metadata string from API Returns: Dictionary of parsed metadata

Parameters:

  • metadata_str: Type: str

Returns: Returns Dict[str, Any]

_api_request(self, endpoint, data, params, files, method) -> dict

Purpose: Make a request to the FileCloud API. Args: endpoint: API endpoint to call (without server URL) data: Dictionary of form data to send params: Dictionary of URL parameters files: Dictionary of files to upload method: HTTP method (POST or GET) Returns: Dictionary with parsed response

Parameters:

  • endpoint: Type: str
  • data: Type: dict
  • params: Type: dict
  • files: Type: dict
  • method: Type: str

Returns: Returns dict

Required Imports

import requests
import os
import xmltodict
import time
import io

Usage Example

# Example usage:
# result = FileCloudAPI(bases)

Similar Components

AI-powered semantic similarity - components with related functionality:

  • class FileCloudAPI 98.7% similar

    Python wrapper for the FileCloud REST API. This class provides methods to interact with FileCloud server APIs, handling authentication, session management, and various file operations.

    From: /tf/active/vicechatdev/FC_api copy.py
  • class FileCloudClient 70.0% similar

    A client class for interacting with FileCloud server API, providing authentication, file management, folder creation, and file upload capabilities.

    From: /tf/active/vicechatdev/SPFCsync/filecloud_client.py
  • class MetadataCatalog 51.9% similar

    Helper class to manage FileCloud metadata sets and attributes. This class provides methods to work with FileCloud metadata by providing a more user-friendly interface on top of the raw API.

    From: /tf/active/vicechatdev/metadata_catalog.py
  • function test_acl_functions 51.8% similar

    Comprehensive test function that validates ACL (Access Control List) management operations in FileCloudAPI, including creating, reading, updating, and deleting ACL entries for users and groups.

    From: /tf/active/vicechatdev/test_acl_functions.py
  • function test_filecloud_connection 46.2% similar

    Tests the connection to a FileCloud server by attempting to instantiate a FileCloudClient with credentials from configuration.

    From: /tf/active/vicechatdev/SPFCsync/test_connections.py
← Back to Browse