287 lines
9.0 KiB
Python
287 lines
9.0 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Import all cocktails from TheCocktailDB into Bar Assistant
|
|
|
|
Requirements:
|
|
pip install requests
|
|
|
|
Usage:
|
|
python import-cocktaildb-to-bar-assistant-v2.py
|
|
"""
|
|
|
|
import requests
|
|
import time
|
|
import json
|
|
import re
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
# Configuration
|
|
BAR_ASSISTANT_URL = "https://cocktails.nianticbooks.com"
|
|
BAR_ASSISTANT_API = f"{BAR_ASSISTANT_URL}/bar/api"
|
|
COCKTAILDB_API = "https://www.thecocktaildb.com/api/json/v1/1"
|
|
|
|
BAR_ASSISTANT_TOKEN = "3|dqtZPWZoKdU59fFRLkGrL83I9HKm9EqmLCgO4kkI96199a82"
|
|
BAR_ID = 1
|
|
|
|
# Cache for ingredients
|
|
ingredient_cache = {}
|
|
|
|
|
|
def parse_amount(measure_str: str) -> Tuple[Optional[float], str]:
|
|
"""Parse amount string like '2 oz' into (2.0, 'oz')"""
|
|
if not measure_str or not measure_str.strip():
|
|
return (None, "")
|
|
|
|
measure_str = measure_str.strip()
|
|
|
|
# Try to extract number and unit
|
|
# Handles: "2 oz", "1/2 oz", "1.5 oz", "2", etc.
|
|
match = re.match(r'^([\d./\s]+)\s*(.*)$', measure_str)
|
|
|
|
if match:
|
|
amount_str = match.group(1).strip()
|
|
unit = match.group(2).strip()
|
|
|
|
# Handle fractions like "1/2" or "1 1/2"
|
|
try:
|
|
# Split on spaces to handle mixed fractions
|
|
parts = amount_str.split()
|
|
total = 0.0
|
|
|
|
for part in parts:
|
|
if '/' in part:
|
|
# Handle fraction
|
|
num, denom = part.split('/')
|
|
total += float(num) / float(denom)
|
|
else:
|
|
# Handle whole number or decimal
|
|
total += float(part)
|
|
|
|
return (total, unit if unit else "oz")
|
|
except:
|
|
pass
|
|
|
|
# If we can't parse, return None
|
|
return (None, "")
|
|
|
|
|
|
def get_or_create_ingredient(name: str, headers: Dict) -> Optional[int]:
|
|
"""Get ingredient ID by name, or create if it doesn't exist"""
|
|
|
|
# Check cache first
|
|
name_lower = name.lower().strip()
|
|
if name_lower in ingredient_cache:
|
|
return ingredient_cache[name_lower]
|
|
|
|
# Search for ingredient
|
|
url = f"{BAR_ASSISTANT_API}/ingredients"
|
|
params = {"filter[name]": name}
|
|
|
|
try:
|
|
response = requests.get(url, params=params, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
# Check if we found a match
|
|
if data.get('data') and len(data['data']) > 0:
|
|
for ingredient in data['data']:
|
|
if ingredient['name'].lower() == name_lower:
|
|
ingredient_id = ingredient['id']
|
|
ingredient_cache[name_lower] = ingredient_id
|
|
return ingredient_id
|
|
|
|
# Ingredient not found, create it
|
|
print(f" Creating new ingredient: {name}")
|
|
create_data = {
|
|
"name": name,
|
|
"strength": 0,
|
|
"description": f"Imported from TheCocktailDB"
|
|
}
|
|
|
|
response = requests.post(url, json=create_data, headers=headers, timeout=10)
|
|
response.raise_for_status()
|
|
ingredient_id = response.json()['data']['id']
|
|
ingredient_cache[name_lower] = ingredient_id
|
|
return ingredient_id
|
|
|
|
except Exception as e:
|
|
print(f" [!] Error with ingredient '{name}': {e}")
|
|
return None
|
|
|
|
|
|
def get_all_cocktails_from_cocktaildb() -> List[Dict]:
|
|
"""Fetch all cocktails from TheCocktailDB"""
|
|
print("Fetching all cocktails from TheCocktailDB...")
|
|
all_cocktails = []
|
|
|
|
letters = 'abcdefghijklmnopqrstuvwxyz0123456789'
|
|
|
|
for letter in letters:
|
|
print(f" Fetching cocktails starting with '{letter}'...")
|
|
url = f"{COCKTAILDB_API}/search.php?f={letter}"
|
|
|
|
try:
|
|
response = requests.get(url, timeout=10)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get('drinks'):
|
|
cocktails = data['drinks']
|
|
all_cocktails.extend(cocktails)
|
|
print(f" Found {len(cocktails)} cocktails")
|
|
|
|
time.sleep(0.5)
|
|
|
|
except Exception as e:
|
|
print(f" Error fetching '{letter}': {e}")
|
|
continue
|
|
|
|
print(f"\n[OK] Total cocktails found: {len(all_cocktails)}")
|
|
return all_cocktails
|
|
|
|
|
|
def convert_cocktail_ingredients(cocktail: Dict, headers: Dict) -> List[Dict]:
|
|
"""Extract and convert ingredients from TheCocktailDB format"""
|
|
ingredients = []
|
|
|
|
for i in range(1, 16):
|
|
ingredient_key = f"strIngredient{i}"
|
|
measure_key = f"strMeasure{i}"
|
|
|
|
ingredient_name = cocktail.get(ingredient_key)
|
|
measure = cocktail.get(measure_key, "")
|
|
|
|
if ingredient_name and ingredient_name.strip():
|
|
# Get or create the ingredient
|
|
ingredient_id = get_or_create_ingredient(ingredient_name.strip(), headers)
|
|
|
|
if ingredient_id:
|
|
# Parse the amount
|
|
amount, units = parse_amount(measure)
|
|
|
|
ingredients.append({
|
|
"sort": i, # Add sort field for ingredient order
|
|
"ingredient_id": ingredient_id,
|
|
"amount": amount if amount is not None else 0,
|
|
"units": units if units else "oz", # Default to "oz" if no unit specified
|
|
"optional": False
|
|
})
|
|
|
|
return ingredients
|
|
|
|
|
|
def import_cocktail(cocktail: Dict, headers: Dict) -> bool:
|
|
"""Import a single cocktail into Bar Assistant"""
|
|
|
|
# Convert ingredients first
|
|
ingredients = convert_cocktail_ingredients(cocktail, headers)
|
|
|
|
if not ingredients:
|
|
print(f" [!] No valid ingredients found")
|
|
return False
|
|
|
|
# Build cocktail data
|
|
cocktail_data = {
|
|
"name": cocktail.get('strDrink', 'Unknown'),
|
|
"instructions": cocktail.get('strInstructions', ''),
|
|
"description": f"Imported from TheCocktailDB. Category: {cocktail.get('strCategory', 'N/A')}. {cocktail.get('strAlcoholic', '')}",
|
|
"source": f"TheCocktailDB (ID: {cocktail.get('idDrink')})",
|
|
"garnish": cocktail.get('strGarnish', ''),
|
|
"glass": cocktail.get('strGlass', ''),
|
|
"ingredients": ingredients
|
|
# Note: Images are not imported - Bar Assistant expects image IDs, not URLs
|
|
}
|
|
|
|
# Add tags
|
|
tags = []
|
|
if cocktail.get('strCategory'):
|
|
tags.append(cocktail['strCategory'])
|
|
if cocktail.get('strAlcoholic'):
|
|
tags.append(cocktail['strAlcoholic'])
|
|
if cocktail.get('strIBA'):
|
|
tags.append(cocktail['strIBA'])
|
|
|
|
if tags:
|
|
cocktail_data['tags'] = tags
|
|
|
|
# Import
|
|
url = f"{BAR_ASSISTANT_API}/cocktails"
|
|
|
|
try:
|
|
response = requests.post(url, json=cocktail_data, headers=headers, timeout=30)
|
|
response.raise_for_status()
|
|
return True
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 409:
|
|
print(f" [!] Already exists")
|
|
return False
|
|
elif e.response.status_code == 422:
|
|
print(f" [X] Validation error: {e.response.text[:200]}")
|
|
return False
|
|
else:
|
|
print(f" [X] HTTP Error {e.response.status_code}")
|
|
return False
|
|
except Exception as e:
|
|
print(f" [X] Error: {e}")
|
|
return False
|
|
|
|
|
|
def main():
|
|
"""Main execution"""
|
|
print("="*60)
|
|
print("TheCocktailDB -> Bar Assistant Bulk Import")
|
|
print("="*60)
|
|
|
|
# Set up headers
|
|
headers = {
|
|
"Authorization": f"Bearer {BAR_ASSISTANT_TOKEN}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json",
|
|
"Bar-Assistant-Bar-Id": str(BAR_ID)
|
|
}
|
|
|
|
# Step 1: Fetch all cocktails from TheCocktailDB
|
|
cocktails = get_all_cocktails_from_cocktaildb()
|
|
|
|
if not cocktails:
|
|
print("\n[ERROR] No cocktails found from TheCocktailDB")
|
|
return
|
|
|
|
# Step 2: Import
|
|
print(f"\nImporting {len(cocktails)} cocktails to Bar Assistant...")
|
|
print("This may take a while...\n")
|
|
|
|
success_count = 0
|
|
skip_count = 0
|
|
error_count = 0
|
|
|
|
for idx, cocktail in enumerate(cocktails, 1):
|
|
cocktail_name = cocktail.get('strDrink', 'Unknown')
|
|
print(f"[{idx}/{len(cocktails)}] {cocktail_name}...")
|
|
|
|
if import_cocktail(cocktail, headers):
|
|
success_count += 1
|
|
print(f" [OK] Imported successfully")
|
|
else:
|
|
skip_count += 1
|
|
|
|
time.sleep(0.5)
|
|
|
|
# Summary
|
|
print("\n" + "="*60)
|
|
print("Import Complete!")
|
|
print("="*60)
|
|
print(f"[OK] Successfully imported: {success_count}")
|
|
print(f"[!] Skipped (duplicates/errors): {skip_count}")
|
|
print(f"\nTotal processed: {len(cocktails)}")
|
|
print(f"\nView your cocktails at: {BAR_ASSISTANT_URL}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n\n[!] Import cancelled by user")
|
|
except Exception as e:
|
|
print(f"\n\n[ERROR] Error: {e}")
|