#!/usr/bin/env python3 """ Import all cocktails from TheCocktailDB into Bar Assistant Requirements: pip install requests Usage: python import-cocktaildb-to-bar-assistant-v2.py """ import requests import time import json import re from typing import Dict, List, Optional, Tuple # Configuration BAR_ASSISTANT_URL = "https://cocktails.nianticbooks.com" BAR_ASSISTANT_API = f"{BAR_ASSISTANT_URL}/bar/api" COCKTAILDB_API = "https://www.thecocktaildb.com/api/json/v1/1" BAR_ASSISTANT_TOKEN = "3|dqtZPWZoKdU59fFRLkGrL83I9HKm9EqmLCgO4kkI96199a82" BAR_ID = 1 # Cache for ingredients ingredient_cache = {} def parse_amount(measure_str: str) -> Tuple[Optional[float], str]: """Parse amount string like '2 oz' into (2.0, 'oz')""" if not measure_str or not measure_str.strip(): return (None, "") measure_str = measure_str.strip() # Try to extract number and unit # Handles: "2 oz", "1/2 oz", "1.5 oz", "2", etc. match = re.match(r'^([\d./\s]+)\s*(.*)$', measure_str) if match: amount_str = match.group(1).strip() unit = match.group(2).strip() # Handle fractions like "1/2" or "1 1/2" try: # Split on spaces to handle mixed fractions parts = amount_str.split() total = 0.0 for part in parts: if '/' in part: # Handle fraction num, denom = part.split('/') total += float(num) / float(denom) else: # Handle whole number or decimal total += float(part) return (total, unit if unit else "oz") except: pass # If we can't parse, return None return (None, "") def get_or_create_ingredient(name: str, headers: Dict) -> Optional[int]: """Get ingredient ID by name, or create if it doesn't exist""" # Check cache first name_lower = name.lower().strip() if name_lower in ingredient_cache: return ingredient_cache[name_lower] # Search for ingredient url = f"{BAR_ASSISTANT_API}/ingredients" params = {"filter[name]": name} try: response = requests.get(url, params=params, headers=headers, timeout=10) response.raise_for_status() data = response.json() # Check if we found a match if data.get('data') and len(data['data']) > 0: for ingredient in data['data']: if ingredient['name'].lower() == name_lower: ingredient_id = ingredient['id'] ingredient_cache[name_lower] = ingredient_id return ingredient_id # Ingredient not found, create it print(f" Creating new ingredient: {name}") create_data = { "name": name, "strength": 0, "description": f"Imported from TheCocktailDB" } response = requests.post(url, json=create_data, headers=headers, timeout=10) response.raise_for_status() ingredient_id = response.json()['data']['id'] ingredient_cache[name_lower] = ingredient_id return ingredient_id except Exception as e: print(f" [!] Error with ingredient '{name}': {e}") return None def get_all_cocktails_from_cocktaildb() -> List[Dict]: """Fetch all cocktails from TheCocktailDB""" print("Fetching all cocktails from TheCocktailDB...") all_cocktails = [] letters = 'abcdefghijklmnopqrstuvwxyz0123456789' for letter in letters: print(f" Fetching cocktails starting with '{letter}'...") url = f"{COCKTAILDB_API}/search.php?f={letter}" try: response = requests.get(url, timeout=10) response.raise_for_status() data = response.json() if data.get('drinks'): cocktails = data['drinks'] all_cocktails.extend(cocktails) print(f" Found {len(cocktails)} cocktails") time.sleep(0.5) except Exception as e: print(f" Error fetching '{letter}': {e}") continue print(f"\n[OK] Total cocktails found: {len(all_cocktails)}") return all_cocktails def convert_cocktail_ingredients(cocktail: Dict, headers: Dict) -> List[Dict]: """Extract and convert ingredients from TheCocktailDB format""" ingredients = [] for i in range(1, 16): ingredient_key = f"strIngredient{i}" measure_key = f"strMeasure{i}" ingredient_name = cocktail.get(ingredient_key) measure = cocktail.get(measure_key, "") if ingredient_name and ingredient_name.strip(): # Get or create the ingredient ingredient_id = get_or_create_ingredient(ingredient_name.strip(), headers) if ingredient_id: # Parse the amount amount, units = parse_amount(measure) ingredients.append({ "sort": i, # Add sort field for ingredient order "ingredient_id": ingredient_id, "amount": amount if amount is not None else 0, "units": units if units else "oz", # Default to "oz" if no unit specified "optional": False }) return ingredients def import_cocktail(cocktail: Dict, headers: Dict) -> bool: """Import a single cocktail into Bar Assistant""" # Convert ingredients first ingredients = convert_cocktail_ingredients(cocktail, headers) if not ingredients: print(f" [!] No valid ingredients found") return False # Build cocktail data cocktail_data = { "name": cocktail.get('strDrink', 'Unknown'), "instructions": cocktail.get('strInstructions', ''), "description": f"Imported from TheCocktailDB. Category: {cocktail.get('strCategory', 'N/A')}. {cocktail.get('strAlcoholic', '')}", "source": f"TheCocktailDB (ID: {cocktail.get('idDrink')})", "garnish": cocktail.get('strGarnish', ''), "glass": cocktail.get('strGlass', ''), "ingredients": ingredients # Note: Images are not imported - Bar Assistant expects image IDs, not URLs } # Add tags tags = [] if cocktail.get('strCategory'): tags.append(cocktail['strCategory']) if cocktail.get('strAlcoholic'): tags.append(cocktail['strAlcoholic']) if cocktail.get('strIBA'): tags.append(cocktail['strIBA']) if tags: cocktail_data['tags'] = tags # Import url = f"{BAR_ASSISTANT_API}/cocktails" try: response = requests.post(url, json=cocktail_data, headers=headers, timeout=30) response.raise_for_status() return True except requests.exceptions.HTTPError as e: if e.response.status_code == 409: print(f" [!] Already exists") return False elif e.response.status_code == 422: print(f" [X] Validation error: {e.response.text[:200]}") return False else: print(f" [X] HTTP Error {e.response.status_code}") return False except Exception as e: print(f" [X] Error: {e}") return False def main(): """Main execution""" print("="*60) print("TheCocktailDB -> Bar Assistant Bulk Import") print("="*60) # Set up headers headers = { "Authorization": f"Bearer {BAR_ASSISTANT_TOKEN}", "Content-Type": "application/json", "Accept": "application/json", "Bar-Assistant-Bar-Id": str(BAR_ID) } # Step 1: Fetch all cocktails from TheCocktailDB cocktails = get_all_cocktails_from_cocktaildb() if not cocktails: print("\n[ERROR] No cocktails found from TheCocktailDB") return # Step 2: Import print(f"\nImporting {len(cocktails)} cocktails to Bar Assistant...") print("This may take a while...\n") success_count = 0 skip_count = 0 error_count = 0 for idx, cocktail in enumerate(cocktails, 1): cocktail_name = cocktail.get('strDrink', 'Unknown') print(f"[{idx}/{len(cocktails)}] {cocktail_name}...") if import_cocktail(cocktail, headers): success_count += 1 print(f" [OK] Imported successfully") else: skip_count += 1 time.sleep(0.5) # Summary print("\n" + "="*60) print("Import Complete!") print("="*60) print(f"[OK] Successfully imported: {success_count}") print(f"[!] Skipped (duplicates/errors): {skip_count}") print(f"\nTotal processed: {len(cocktails)}") print(f"\nView your cocktails at: {BAR_ASSISTANT_URL}") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\n\n[!] Import cancelled by user") except Exception as e: print(f"\n\n[ERROR] Error: {e}")