class PatternBasedExtractor
Extract flocks based on farm-level In-Ovo usage patterns.
/tf/active/vicechatdev/pattern_based_extraction.py
51 - 503
moderate
Purpose
Extract flocks based on farm-level In-Ovo usage patterns.
Source Code
class PatternBasedExtractor:
"""Extract flocks based on farm-level In-Ovo usage patterns."""
def __init__(self, data_dir: str = '/tf/active/pehestat_data', geocoded_file: str = None):
"""Initialize the pattern-based extractor."""
self.data_dir = data_dir
self.geocoded_file = geocoded_file
self.analyzer = MatchedSampleAnalyzer(data_dir=data_dir)
self.extractor = PehestatDataExtractor()
self.geocoded_data = None
# Load geocoded data if available
if geocoded_file and os.path.exists(geocoded_file):
self._load_geocoded_data(geocoded_file)
def _load_geocoded_data(self, geocoded_file: str):
"""Load geocoded data from previous extraction."""
try:
print(f"Loading geocoded data from: {geocoded_file}")
self.geocoded_data = pd.read_csv(geocoded_file)
# Parse date columns
date_columns = ['DateOfBirth', 'StartDate', 'EndDate']
for col in date_columns:
if col in self.geocoded_data.columns:
self.geocoded_data[col] = pd.to_datetime(self.geocoded_data[col], errors='coerce')
print(f"Loaded geocoded data: {len(self.geocoded_data)} flocks")
print(f"Establishments with coordinates: {self.geocoded_data['EstablishmentNr'].nunique()}")
except Exception as e:
print(f"Warning: Could not load geocoded data: {e}")
self.geocoded_data = None
def load_and_filter_base_data(self, start_date: str = '2020-01-01') -> pd.DataFrame:
"""Load and apply base filters for pattern analysis."""
print(f"Loading and filtering base data (start date: {start_date})...")
# Get the flocks data
flocks_df = self.analyzer.flocks_df.copy()
# Apply base filters: Belgium, VLE, Ross 308, start date >= 2020
filtered_df = flocks_df[
(flocks_df['Country'] == 'BE') &
(flocks_df['Type'] == 'VLE') &
(flocks_df['Breed'] == 'Ross 308')
].copy()
# Ensure date columns are datetime
date_columns = ['DateOfBirth', 'StartDate', 'EndDate']
for col in date_columns:
if col in filtered_df.columns:
filtered_df[col] = pd.to_datetime(filtered_df[col], errors='coerce')
# Filter for start date >= specified date
if 'StartDate' in filtered_df.columns:
start_date_filter = pd.to_datetime(start_date)
filtered_df = filtered_df[filtered_df['StartDate'] >= start_date_filter]
print(f"Applied start date filter (>= {start_date}): {len(filtered_df)} flocks remaining")
elif 'DateOfBirth' in filtered_df.columns:
# Fallback to DateOfBirth if StartDate not available
start_date_filter = pd.to_datetime(start_date)
filtered_df = filtered_df[filtered_df['DateOfBirth'] >= start_date_filter]
print(f"Applied date of birth filter (>= {start_date}): {len(filtered_df)} flocks remaining")
print(f"Base filtered flocks: {len(filtered_df):,}")
print(f"Establishments: {filtered_df['EstablishmentNr'].nunique():,}")
return filtered_df
def identify_mixed_farms(self, flocks_df: pd.DataFrame) -> pd.DataFrame:
"""Identify farms that have both In-Ovo and standard flocks."""
print("Identifying mixed farms...")
# Group by establishment to analyze farm-level patterns
farm_summary = flocks_df.groupby('EstablishmentNr').agg({
'InOvo': ['count', 'sum', 'nunique'],
'DateOfBirth': ['min', 'max'],
'FlockCD': 'nunique'
}).round(2)
# Flatten column names
farm_summary.columns = [
'total_flocks', 'inovo_flocks', 'inovo_unique_values',
'first_flock_date', 'last_flock_date', 'unique_flocks'
]
# Calculate derived metrics
farm_summary['standard_flocks'] = farm_summary['total_flocks'] - farm_summary['inovo_flocks']
farm_summary['inovo_percentage'] = (farm_summary['inovo_flocks'] / farm_summary['total_flocks'] * 100).round(1)
farm_summary['has_both_types'] = farm_summary['inovo_unique_values'] == 2
# Filter for farms with both In-Ovo and standard flocks
mixed_farms = farm_summary[farm_summary['has_both_types']].copy()
mixed_farms.reset_index(inplace=True)
print(f"Total farms: {len(farm_summary):,}")
print(f"Mixed farms (both In-Ovo and standard): {len(mixed_farms):,}")
print(f"Percentage of mixed farms: {len(mixed_farms)/len(farm_summary)*100:.1f}%")
return mixed_farms
def classify_farm_patterns(self, flocks_df: pd.DataFrame, mixed_farms_df: pd.DataFrame) -> pd.DataFrame:
"""Classify farms by their In-Ovo usage patterns."""
print("Classifying farm patterns...")
# Get flocks from mixed farms only
mixed_farm_flocks = flocks_df[
flocks_df['EstablishmentNr'].isin(mixed_farms_df['EstablishmentNr'])
].copy()
# Analyze patterns for each farm
pattern_results = []
for establishment_nr in mixed_farms_df['EstablishmentNr']:
farm_flocks = mixed_farm_flocks[
mixed_farm_flocks['EstablishmentNr'] == establishment_nr
].copy()
if len(farm_flocks) < 2:
continue
# Calculate pattern metrics
pattern_info = self._analyze_farm_pattern(farm_flocks, establishment_nr)
pattern_results.append(pattern_info)
patterns_df = pd.DataFrame(pattern_results)
# Print pattern distribution
if len(patterns_df) > 0:
pattern_counts = patterns_df['usage_pattern'].value_counts()
print("\nPattern Distribution:")
for pattern, count in pattern_counts.items():
percentage = count / len(patterns_df) * 100
print(f" {pattern}: {count:,} farms ({percentage:.1f}%)")
return patterns_df
def _analyze_farm_pattern(self, farm_flocks: pd.DataFrame, establishment_nr: str) -> Dict:
"""Analyze the In-Ovo pattern for a single farm."""
farm_flocks = farm_flocks.sort_values('DateOfBirth')
# Basic info
pattern_info = {
'EstablishmentNr': establishment_nr,
'total_flocks': len(farm_flocks),
'inovo_flocks': farm_flocks['InOvo'].sum(),
'standard_flocks': len(farm_flocks) - farm_flocks['InOvo'].sum(),
'first_date': farm_flocks['DateOfBirth'].min(),
'last_date': farm_flocks['DateOfBirth'].max(),
}
# Count transitions between In-Ovo and standard
inovo_sequence = farm_flocks['InOvo'].tolist()
transitions = sum(1 for i in range(1, len(inovo_sequence))
if inovo_sequence[i] != inovo_sequence[i-1])
pattern_info['transitions'] = transitions
# Analyze concurrent usage (overlapping periods)
concurrent_periods = self._find_concurrent_periods(farm_flocks)
pattern_info['concurrent_periods'] = len(concurrent_periods)
pattern_info['has_concurrent_usage'] = len(concurrent_periods) > 0
# Calculate usage pattern classification
if pattern_info['has_concurrent_usage']:
pattern_info['usage_pattern'] = 'concurrent'
elif transitions <= 1:
pattern_info['usage_pattern'] = 'sequential'
else:
pattern_info['usage_pattern'] = 'mixed'
# Calculate In-Ovo percentage
pattern_info['inovo_percentage'] = (pattern_info['inovo_flocks'] / pattern_info['total_flocks'] * 100) if pattern_info['total_flocks'] > 0 else 0
return pattern_info
def _find_concurrent_periods(self, farm_flocks: pd.DataFrame) -> List[Dict]:
"""Find periods where In-Ovo and standard flocks overlap in time."""
concurrent_periods = []
# Create a list of flock periods with their In-Ovo status
flock_periods = []
for _, flock in farm_flocks.iterrows():
if pd.notna(flock.get('StartDate')) and pd.notna(flock.get('EndDate')):
flock_periods.append({
'start': flock['StartDate'],
'end': flock['EndDate'],
'inovo': flock['InOvo'],
'flock_id': flock['FlockCD']
})
# Check for overlaps between In-Ovo and standard flocks
for i, period1 in enumerate(flock_periods):
for j, period2 in enumerate(flock_periods[i+1:], i+1):
# Check if different In-Ovo status and overlapping periods
if (period1['inovo'] != period2['inovo'] and
period1['start'] <= period2['end'] and
period2['start'] <= period1['end']):
overlap_start = max(period1['start'], period2['start'])
overlap_end = min(period1['end'], period2['end'])
overlap_days = (overlap_end - overlap_start).days
if overlap_days > 0:
concurrent_periods.append({
'flock1': period1['flock_id'],
'flock2': period2['flock_id'],
'overlap_start': overlap_start,
'overlap_end': overlap_end,
'overlap_days': overlap_days
})
return concurrent_periods
def extract_flocks_by_pattern(self, pattern: str, flocks_df: pd.DataFrame,
patterns_df: pd.DataFrame, sample_size: Optional[int] = None) -> pd.DataFrame:
"""Extract flocks from farms with specified pattern."""
print(f"\nExtracting flocks from farms with '{pattern}' pattern...")
# Get farms with specified pattern
pattern_farms = patterns_df[patterns_df['usage_pattern'] == pattern]['EstablishmentNr'].tolist()
if not pattern_farms:
print(f"No farms found with '{pattern}' pattern!")
return pd.DataFrame()
print(f"Found {len(pattern_farms)} farms with '{pattern}' pattern")
# Extract flocks from these farms
pattern_flocks = flocks_df[flocks_df['EstablishmentNr'].isin(pattern_farms)].copy()
print(f"Total flocks from {pattern} farms: {len(pattern_flocks):,}")
# Apply sampling if requested
if sample_size and len(pattern_flocks) > sample_size:
print(f"Sampling {sample_size} flocks from {len(pattern_flocks)} available flocks...")
pattern_flocks = pattern_flocks.sample(n=sample_size, random_state=42)
print(f"Sampled flocks: {len(pattern_flocks):,}")
# Add pattern information to flocks
pattern_lookup = patterns_df.set_index('EstablishmentNr')[
['usage_pattern', 'transitions', 'concurrent_periods', 'inovo_percentage']
].to_dict('index')
for col in ['usage_pattern', 'transitions', 'concurrent_periods', 'farm_inovo_percentage']:
pattern_flocks[col] = pattern_flocks['EstablishmentNr'].map(
lambda x: pattern_lookup.get(x, {}).get(col.replace('farm_', ''), None)
)
return pattern_flocks
def enrich_flock_data(self, flocks_df: pd.DataFrame) -> pd.DataFrame:
"""Enrich flock data with owner, veterinarian, and supplier information."""
print("\nEnriching flock data with additional information...")
# Use the extractor's enrichment methods
enriched_df = self.extractor._enrich_with_establishment_info(flocks_df)
enriched_df = self.extractor._enrich_with_usergroup_info(enriched_df)
# Add supplier information (hatchery and feed)
enriched_df = self._add_supplier_info(enriched_df)
# Add geocoding if available
if self.geocoded_data is not None:
enriched_df = self._merge_geocoding_data(enriched_df)
print(f"Enrichment completed: {len(enriched_df)} flocks")
return enriched_df
def _add_supplier_info(self, flocks_df: pd.DataFrame) -> pd.DataFrame:
"""Add primary hatchery and feed supplier information."""
print("Adding supplier information...")
# Get supplier data
try:
# Load establishments data for supplier mapping
if hasattr(self.analyzer, 'establishments_df'):
establishments = self.analyzer.establishments_df
# Merge to get establishment details
supplier_cols = []
if 'HatcheryNr' in establishments.columns and 'HatcheryName' in establishments.columns:
supplier_cols.extend(['HatcheryNr', 'HatcheryName'])
if 'FeedSupplierNr' in establishments.columns and 'FeedSupplierName' in establishments.columns:
supplier_cols.extend(['FeedSupplierNr', 'FeedSupplierName'])
if supplier_cols:
flocks_df = flocks_df.merge(
establishments[['EstablishmentNr'] + supplier_cols].drop_duplicates(),
on='EstablishmentNr',
how='left'
)
print(f"Added supplier columns: {supplier_cols}")
# Try to get feed supplier from flocks data directly
if hasattr(self.analyzer, 'flocks_df') and 'FeedSupplierNr' in self.analyzer.flocks_df.columns:
feed_data = self.analyzer.flocks_df[['FlockCD', 'FeedSupplierNr']].drop_duplicates()
if 'FeedSupplierNr' not in flocks_df.columns:
flocks_df = flocks_df.merge(feed_data, on='FlockCD', how='left')
print("Added feed supplier from flocks data")
except Exception as e:
print(f"Warning: Could not add supplier info: {e}")
return flocks_df
def _merge_geocoding_data(self, flocks_df: pd.DataFrame) -> pd.DataFrame:
"""Merge geocoding coordinates from previous extraction."""
print("Merging geocoding coordinates...")
# Select geocoding columns
geo_cols = ['EstablishmentNr', 'Latitude', 'Longitude', 'GeocodingSource', 'GeocodingAccuracy']
available_geo_cols = [col for col in geo_cols if col in self.geocoded_data.columns]
if len(available_geo_cols) > 1: # Need at least EstablishmentNr + one coordinate
geo_data = self.geocoded_data[available_geo_cols].drop_duplicates(subset=['EstablishmentNr'])
# Merge coordinates
before_count = len(flocks_df)
flocks_df = flocks_df.merge(geo_data, on='EstablishmentNr', how='left')
after_count = len(flocks_df)
if before_count == after_count:
coord_count = flocks_df['Latitude'].notna().sum()
print(f"Added coordinates for {coord_count}/{len(flocks_df)} flocks")
else:
print(f"Warning: Row count changed during geocoding merge ({before_count} -> {after_count})")
return flocks_df
def _add_geocoding_coordinates(self, flocks_df: pd.DataFrame, cache_only: bool = False) -> pd.DataFrame:
"""Add geocoding coordinates using the PehestatDataExtractor methods."""
print("Adding geocoding coordinates...")
try:
# Use the PehestatDataExtractor's geocoding methods
geocoded_df = self.extractor._add_geocoding_coordinates(flocks_df, cache_only=cache_only)
return geocoded_df
except Exception as e:
print(f"Warning: Could not add geocoding coordinates: {e}")
return flocks_df
def _force_precise_geocoding(self, flocks_df: pd.DataFrame) -> pd.DataFrame:
"""Improve geocoding precision using the PehestatDataExtractor methods."""
print("Improving geocoding precision...")
try:
# Use the PehestatDataExtractor's precision improvement methods
precise_df = self.extractor._force_precise_geocoding(flocks_df)
return precise_df
except Exception as e:
print(f"Warning: Could not improve geocoding precision: {e}")
return flocks_df
def create_interactive_map(self, flocks_df: pd.DataFrame, output_filename: Optional[str] = None,
use_clustering: bool = False) -> str:
"""Create an interactive map using the PehestatDataExtractor methods."""
print("Creating interactive map...")
try:
# Use the PehestatDataExtractor's map creation methods
map_file = self.extractor.create_interactive_map(
flocks_df,
output_path=output_filename,
use_clustering=use_clustering
)
return map_file
except Exception as e:
print(f"Warning: Could not create interactive map: {e}")
return "map_creation_failed.html"
def export_results(self, flocks_df: pd.DataFrame, pattern: str, output_file: str = None,
skip_geocoding: bool = False, cache_only: bool = False,
create_map: bool = True, map_output: str = None,
use_clustering: bool = False) -> str:
"""Export enriched flocks data to CSV and optionally create interactive map."""
if output_file is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f"pattern_based_extraction_{pattern}_{timestamp}.csv"
# Ensure output directory exists
output_dir = os.path.dirname(output_file) if os.path.dirname(output_file) else '.'
os.makedirs(output_dir, exist_ok=True)
# Add geocoding if requested and not skipped
enhanced_flocks = flocks_df.copy()
if not skip_geocoding:
print("\nπΊοΈ Adding geocoding coordinates...")
enhanced_flocks = self._add_geocoding_coordinates(enhanced_flocks, cache_only=cache_only)
if not cache_only:
print("π― Improving geocoding precision...")
enhanced_flocks = self._force_precise_geocoding(enhanced_flocks)
# Export to CSV
enhanced_flocks.to_csv(output_file, index=False)
print(f"\nπ Exported {len(enhanced_flocks)} flocks to: {output_file}")
# Create interactive map if requested
if create_map and not skip_geocoding:
if map_output is None:
base_name = os.path.splitext(output_file)[0]
map_output = f"{base_name}_interactive_map.html"
print("πΊοΈ Creating interactive map...")
map_file = self.create_interactive_map(
enhanced_flocks,
output_filename=map_output,
use_clustering=use_clustering
)
print(f"β
Interactive map created: {map_file}")
# Print summary statistics
self._print_export_summary(enhanced_flocks, pattern)
return output_file
def _print_export_summary(self, flocks_df: pd.DataFrame, pattern: str):
"""Print summary of exported data."""
print(f"\nπ EXPORT SUMMARY - {pattern.upper()} PATTERN")
print("=" * 60)
print(f"Total flocks: {len(flocks_df):,}")
print(f"Establishments: {flocks_df['EstablishmentNr'].nunique():,}")
print(f"In-Ovo flocks: {flocks_df['InOvo'].sum():,} ({flocks_df['InOvo'].sum()/len(flocks_df)*100:.1f}%)")
print(f"Standard flocks: {(~flocks_df['InOvo']).sum():,} ({(~flocks_df['InOvo']).sum()/len(flocks_df)*100:.1f}%)")
# Date range
if 'StartDate' in flocks_df.columns:
start_dates = flocks_df['StartDate'].dropna()
if len(start_dates) > 0:
print(f"Date range (Start): {start_dates.min().strftime('%Y-%m-%d')} to {start_dates.max().strftime('%Y-%m-%d')}")
elif 'DateOfBirth' in flocks_df.columns:
birth_dates = flocks_df['DateOfBirth'].dropna()
if len(birth_dates) > 0:
print(f"Date range (Birth): {birth_dates.min().strftime('%Y-%m-%d')} to {birth_dates.max().strftime('%Y-%m-%d')}")
# Enrichment summary
enrichment_fields = {
'Owner names': 'OwnerName',
'Veterinarian names': 'VeterinarianName',
'Full addresses': 'FullAddress',
'Coordinates': 'Latitude',
'Hatchery info': 'HatcheryName',
'Feed supplier info': 'FeedSupplierName'
}
print("\nEnrichment coverage:")
for field_name, col_name in enrichment_fields.items():
if col_name in flocks_df.columns:
coverage = flocks_df[col_name].notna().sum()
percentage = coverage / len(flocks_df) * 100
print(f" {field_name}: {coverage:,}/{len(flocks_df):,} ({percentage:.1f}%)")
print("=" * 60)
Parameters
| Name | Type | Default | Kind |
|---|---|---|---|
bases |
- | - |
Parameter Details
bases: Parameter of type
Return Value
Returns unspecified type
Class Interface
Methods
__init__(self, data_dir, geocoded_file)
Purpose: Initialize the pattern-based extractor.
Parameters:
data_dir: Type: strgeocoded_file: Type: str
Returns: None
_load_geocoded_data(self, geocoded_file)
Purpose: Load geocoded data from previous extraction.
Parameters:
geocoded_file: Type: str
Returns: None
load_and_filter_base_data(self, start_date) -> pd.DataFrame
Purpose: Load and apply base filters for pattern analysis.
Parameters:
start_date: Type: str
Returns: Returns pd.DataFrame
identify_mixed_farms(self, flocks_df) -> pd.DataFrame
Purpose: Identify farms that have both In-Ovo and standard flocks.
Parameters:
flocks_df: Type: pd.DataFrame
Returns: Returns pd.DataFrame
classify_farm_patterns(self, flocks_df, mixed_farms_df) -> pd.DataFrame
Purpose: Classify farms by their In-Ovo usage patterns.
Parameters:
flocks_df: Type: pd.DataFramemixed_farms_df: Type: pd.DataFrame
Returns: Returns pd.DataFrame
_analyze_farm_pattern(self, farm_flocks, establishment_nr) -> Dict
Purpose: Analyze the In-Ovo pattern for a single farm.
Parameters:
farm_flocks: Type: pd.DataFrameestablishment_nr: Type: str
Returns: Returns Dict
_find_concurrent_periods(self, farm_flocks) -> List[Dict]
Purpose: Find periods where In-Ovo and standard flocks overlap in time.
Parameters:
farm_flocks: Type: pd.DataFrame
Returns: Returns List[Dict]
extract_flocks_by_pattern(self, pattern, flocks_df, patterns_df, sample_size) -> pd.DataFrame
Purpose: Extract flocks from farms with specified pattern.
Parameters:
pattern: Type: strflocks_df: Type: pd.DataFramepatterns_df: Type: pd.DataFramesample_size: Type: Optional[int]
Returns: Returns pd.DataFrame
enrich_flock_data(self, flocks_df) -> pd.DataFrame
Purpose: Enrich flock data with owner, veterinarian, and supplier information.
Parameters:
flocks_df: Type: pd.DataFrame
Returns: Returns pd.DataFrame
_add_supplier_info(self, flocks_df) -> pd.DataFrame
Purpose: Add primary hatchery and feed supplier information.
Parameters:
flocks_df: Type: pd.DataFrame
Returns: Returns pd.DataFrame
_merge_geocoding_data(self, flocks_df) -> pd.DataFrame
Purpose: Merge geocoding coordinates from previous extraction.
Parameters:
flocks_df: Type: pd.DataFrame
Returns: Returns pd.DataFrame
_add_geocoding_coordinates(self, flocks_df, cache_only) -> pd.DataFrame
Purpose: Add geocoding coordinates using the PehestatDataExtractor methods.
Parameters:
flocks_df: Type: pd.DataFramecache_only: Type: bool
Returns: Returns pd.DataFrame
_force_precise_geocoding(self, flocks_df) -> pd.DataFrame
Purpose: Improve geocoding precision using the PehestatDataExtractor methods.
Parameters:
flocks_df: Type: pd.DataFrame
Returns: Returns pd.DataFrame
create_interactive_map(self, flocks_df, output_filename, use_clustering) -> str
Purpose: Create an interactive map using the PehestatDataExtractor methods.
Parameters:
flocks_df: Type: pd.DataFrameoutput_filename: Type: Optional[str]use_clustering: Type: bool
Returns: Returns str
export_results(self, flocks_df, pattern, output_file, skip_geocoding, cache_only, create_map, map_output, use_clustering) -> str
Purpose: Export enriched flocks data to CSV and optionally create interactive map.
Parameters:
flocks_df: Type: pd.DataFramepattern: Type: stroutput_file: Type: strskip_geocoding: Type: boolcache_only: Type: boolcreate_map: Type: boolmap_output: Type: struse_clustering: Type: bool
Returns: Returns str
_print_export_summary(self, flocks_df, pattern)
Purpose: Print summary of exported data.
Parameters:
flocks_df: Type: pd.DataFramepattern: Type: str
Returns: None
Required Imports
import os
import sys
import pandas as pd
import numpy as np
import argparse
Usage Example
# Example usage:
# result = PatternBasedExtractor(bases)
Similar Components
AI-powered semantic similarity - components with related functionality:
-
function main 65.4% similar
-
function analyze_flock_type_patterns 56.0% similar
-
function show_problematic_flocks 51.1% similar
-
class QueryBasedExtractor 49.4% similar
-
class RegulatoryExtractor 45.4% similar