| | |
| | """ |
| | PyArrow Dataset Generator for ML Inference Service |
| | |
| | Generates test datasets for academic challenges and model validation. |
| | Creates 100 PyArrow datasets with various image types and test scenarios. |
| | """ |
| |
|
| | import base64 |
| | import json |
| | import random |
| | from pathlib import Path |
| | from typing import Dict, List, Any, Tuple |
| | import io |
| |
|
| | import numpy as np |
| | import pyarrow as pa |
| | import pyarrow.parquet as pq |
| | from PIL import Image, ImageDraw, ImageFont |
| |
|
| |
|
| | class TestDatasetGenerator: |
| | def __init__(self, output_dir: str = "test_datasets"): |
| | self.output_dir = Path(output_dir) |
| | self.output_dir.mkdir(exist_ok=True) |
| |
|
| | |
| | self.imagenet_labels = [ |
| | "tench", "goldfish", "great_white_shark", "tiger_shark", "hammerhead", |
| | "electric_ray", "stingray", "cock", "hen", "ostrich", "brambling", |
| | "goldfinch", "house_finch", "junco", "indigo_bunting", "robin", |
| | "bulbul", "jay", "magpie", "chickadee", "water_ouzel", "kite", |
| | "bald_eagle", "vulture", "great_grey_owl", "European_fire_salamander", |
| | "common_newt", "eft", "spotted_salamander", "axolotl", "bullfrog", |
| | "tree_frog", "tailed_frog", "loggerhead", "leatherback_turtle", |
| | "mud_turtle", "terrapin", "box_turtle", "banded_gecko", "common_iguana", |
| | "American_chameleon", "whiptail", "agama", "frilled_lizard", "alligator_lizard", |
| | "Gila_monster", "green_lizard", "African_chameleon", "Komodo_dragon", |
| | "African_crocodile", "American_alligator", "triceratops", "thunder_snake" |
| | ] |
| |
|
| | def create_synthetic_image(self, width: int = 224, height: int = 224, |
| | image_type: str = "random") -> Image.Image: |
| | """Create synthetic images for testing.""" |
| | if image_type == "random": |
| | |
| | array = np.random.randint(0, 256, (height, width, 3), dtype=np.uint8) |
| | return Image.fromarray(array) |
| |
|
| | elif image_type == "geometric": |
| | |
| | img = Image.new('RGB', (width, height), color='white') |
| | draw = ImageDraw.Draw(img) |
| |
|
| | |
| | for _ in range(random.randint(3, 8)): |
| | color = tuple(random.randint(0, 255) for _ in range(3)) |
| | shape_type = random.choice(['rectangle', 'ellipse']) |
| | x1, y1 = random.randint(0, width//2), random.randint(0, height//2) |
| | x2, y2 = x1 + random.randint(20, width//2), y1 + random.randint(20, height//2) |
| |
|
| | if shape_type == 'rectangle': |
| | draw.rectangle([x1, y1, x2, y2], fill=color) |
| | else: |
| | draw.ellipse([x1, y1, x2, y2], fill=color) |
| |
|
| | return img |
| |
|
| | elif image_type == "gradient": |
| | array = np.zeros((height, width, 3), dtype=np.uint8) |
| | for i in range(height): |
| | for j in range(width): |
| | array[i, j] = [i * 255 // height, j * 255 // width, (i + j) * 255 // (height + width)] |
| | return Image.fromarray(array) |
| |
|
| | elif image_type == "text": |
| | img = Image.new('RGB', (width, height), color='white') |
| | draw = ImageDraw.Draw(img) |
| |
|
| | try: |
| | font = ImageFont.load_default() |
| | except: |
| | font = None |
| |
|
| | text = f"Test Image {random.randint(1, 1000)}" |
| | draw.text((width//4, height//2), text, fill='black', font=font) |
| | return img |
| |
|
| | else: |
| | color = tuple(random.randint(0, 255) for _ in range(3)) |
| | return Image.new('RGB', (width, height), color=color) |
| |
|
| | def image_to_base64(self, image: Image.Image, format: str = "JPEG") -> str: |
| | """Convert PIL image to base64 string.""" |
| | buffer = io.BytesIO() |
| | image.save(buffer, format=format) |
| | image_bytes = buffer.getvalue() |
| | return base64.b64encode(image_bytes).decode('utf-8') |
| |
|
| | def create_api_request(self, image_b64: str, media_type: str = "image/jpeg") -> Dict[str, Any]: |
| | """Create API request structure matching your service.""" |
| | return { |
| | "image": { |
| | "mediaType": media_type, |
| | "data": image_b64 |
| | } |
| | } |
| |
|
| | def create_expected_response(self, model_name: str = "microsoft/resnet-18", |
| | media_type: str = "image/jpeg") -> Dict[str, Any]: |
| | """Create expected response structure.""" |
| | prediction = random.choice(self.imagenet_labels) |
| | return { |
| | "prediction": prediction, |
| | "confidence": round(random.uniform(0.3, 0.99), 4), |
| | "predicted_label": random.randint(0, len(self.imagenet_labels) - 1), |
| | "model": model_name, |
| | "mediaType": media_type |
| | } |
| |
|
| | def generate_standard_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
| | """Generate standard test cases with normal images.""" |
| | datasets = [] |
| |
|
| | for i in range(count): |
| | image_types = ["random", "geometric", "gradient", "text", "solid"] |
| | sizes = [(224, 224), (256, 256), (299, 299), (384, 384)] |
| | formats = [("JPEG", "image/jpeg"), ("PNG", "image/png")] |
| |
|
| | records = [] |
| | for j in range(random.randint(5, 20)): |
| | img_type = random.choice(image_types) |
| | size = random.choice(sizes) |
| | format_info = random.choice(formats) |
| |
|
| | image = self.create_synthetic_image(size[0], size[1], img_type) |
| | image_b64 = self.image_to_base64(image, format_info[0]) |
| |
|
| | api_request = self.create_api_request(image_b64, format_info[1]) |
| | expected_response = self.create_expected_response() |
| |
|
| | record = { |
| | "dataset_id": f"standard_{i:03d}", |
| | "image_id": f"img_{j:03d}", |
| | "image_type": img_type, |
| | "image_size": f"{size[0]}x{size[1]}", |
| | "format": format_info[0], |
| | "media_type": format_info[1], |
| | "api_request": json.dumps(api_request), |
| | "expected_response": json.dumps(expected_response), |
| | "test_category": "standard", |
| | "difficulty": "normal" |
| | } |
| | records.append(record) |
| |
|
| | datasets.append({ |
| | "name": f"standard_test_{i:03d}", |
| | "category": "standard", |
| | "description": f"Standard test dataset {i+1} with {len(records)} images", |
| | "records": records |
| | }) |
| |
|
| | return datasets |
| |
|
| | def generate_edge_case_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
| | """Generate datasets for edge case scenarios.""" |
| | datasets = [] |
| |
|
| | for i in range(count): |
| | records = [] |
| | edge_cases = [ |
| | {"type": "tiny", "size": (32, 32), "difficulty": "high"}, |
| | {"type": "huge", "size": (2048, 2048), "difficulty": "high"}, |
| | {"type": "extreme_aspect", "size": (1000, 50), "difficulty": "medium"}, |
| | {"type": "single_pixel", "size": (1, 1), "difficulty": "extreme"}, |
| | {"type": "corrupted_base64", "size": (224, 224), "difficulty": "extreme"} |
| | ] |
| |
|
| | for j, edge_case in enumerate(edge_cases): |
| | if edge_case["type"] == "corrupted_base64": |
| | image = self.create_synthetic_image(224, 224, "random") |
| | image_b64 = self.image_to_base64(image, "JPEG") |
| | corrupted_b64 = image_b64[:-20] + "CORRUPTED_DATA" |
| | api_request = self.create_api_request(corrupted_b64) |
| | expected_response = { |
| | "error": "Invalid image data", |
| | "status": "failed" |
| | } |
| | else: |
| | image = self.create_synthetic_image( |
| | edge_case["size"][0], edge_case["size"][1], "random" |
| | ) |
| | image_b64 = self.image_to_base64(image, "PNG") |
| | api_request = self.create_api_request(image_b64, "image/png") |
| | expected_response = self.create_expected_response() |
| |
|
| | record = { |
| | "dataset_id": f"edge_{i:03d}", |
| | "image_id": f"edge_{j:03d}", |
| | "image_type": edge_case["type"], |
| | "image_size": f"{edge_case['size'][0]}x{edge_case['size'][1]}", |
| | "format": "PNG", |
| | "media_type": "image/png", |
| | "api_request": json.dumps(api_request), |
| | "expected_response": json.dumps(expected_response), |
| | "test_category": "edge_case", |
| | "difficulty": edge_case["difficulty"] |
| | } |
| | records.append(record) |
| |
|
| | datasets.append({ |
| | "name": f"edge_case_{i:03d}", |
| | "category": "edge_case", |
| | "description": f"Edge case dataset {i+1} with challenging scenarios", |
| | "records": records |
| | }) |
| |
|
| | return datasets |
| |
|
| | def generate_performance_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
| | """Generate performance benchmark datasets.""" |
| | datasets = [] |
| |
|
| | for i in range(count): |
| | batch_sizes = [1, 5, 10, 25, 50, 100] |
| | batch_size = random.choice(batch_sizes) |
| |
|
| | records = [] |
| | for j in range(batch_size): |
| | image = self.create_synthetic_image(224, 224, "random") |
| | image_b64 = self.image_to_base64(image, "JPEG") |
| | api_request = self.create_api_request(image_b64) |
| | expected_response = self.create_expected_response() |
| |
|
| | record = { |
| | "dataset_id": f"perf_{i:03d}", |
| | "image_id": f"batch_{j:03d}", |
| | "image_type": "performance_test", |
| | "image_size": "224x224", |
| | "format": "JPEG", |
| | "media_type": "image/jpeg", |
| | "api_request": json.dumps(api_request), |
| | "expected_response": json.dumps(expected_response), |
| | "test_category": "performance", |
| | "difficulty": "normal", |
| | "batch_size": batch_size, |
| | "expected_max_latency_ms": batch_size * 100 |
| | } |
| | records.append(record) |
| |
|
| | datasets.append({ |
| | "name": f"performance_test_{i:03d}", |
| | "category": "performance", |
| | "description": f"Performance dataset {i+1} with batch size {batch_size}", |
| | "records": records |
| | }) |
| |
|
| | return datasets |
| |
|
| | def generate_model_comparison_datasets(self, count: int = 25) -> List[Dict[str, Any]]: |
| | """Generate datasets for comparing different models.""" |
| | datasets = [] |
| |
|
| | model_types = [ |
| | "microsoft/resnet-18", "microsoft/resnet-50", "google/vit-base-patch16-224", |
| | "facebook/convnext-tiny-224", "microsoft/swin-tiny-patch4-window7-224" |
| | ] |
| |
|
| | for i in range(count): |
| | |
| | base_images = [] |
| | for _ in range(10): |
| | image = self.create_synthetic_image(224, 224, "geometric") |
| | base_images.append(self.image_to_base64(image, "JPEG")) |
| |
|
| | records = [] |
| | for j, model in enumerate(model_types): |
| | for k, image_b64 in enumerate(base_images): |
| | api_request = self.create_api_request(image_b64) |
| | expected_response = self.create_expected_response(model) |
| |
|
| | record = { |
| | "dataset_id": f"comparison_{i:03d}", |
| | "image_id": f"img_{k:03d}_model_{j}", |
| | "image_type": "comparison_base", |
| | "image_size": "224x224", |
| | "format": "JPEG", |
| | "media_type": "image/jpeg", |
| | "api_request": json.dumps(api_request), |
| | "expected_response": json.dumps(expected_response), |
| | "test_category": "model_comparison", |
| | "difficulty": "normal", |
| | "model_type": model, |
| | "comparison_group": k |
| | } |
| | records.append(record) |
| |
|
| | datasets.append({ |
| | "name": f"model_comparison_{i:03d}", |
| | "category": "model_comparison", |
| | "description": f"Model comparison dataset {i+1} testing {len(model_types)} models", |
| | "records": records |
| | }) |
| |
|
| | return datasets |
| |
|
| | def save_dataset_to_parquet(self, dataset: Dict[str, Any]): |
| | """Save a dataset to PyArrow Parquet format.""" |
| | records = dataset["records"] |
| |
|
| | |
| | table = pa.table({ |
| | "dataset_id": [r["dataset_id"] for r in records], |
| | "image_id": [r["image_id"] for r in records], |
| | "image_type": [r["image_type"] for r in records], |
| | "image_size": [r["image_size"] for r in records], |
| | "format": [r["format"] for r in records], |
| | "media_type": [r["media_type"] for r in records], |
| | "api_request": [r["api_request"] for r in records], |
| | "expected_response": [r["expected_response"] for r in records], |
| | "test_category": [r["test_category"] for r in records], |
| | "difficulty": [r["difficulty"] for r in records], |
| | |
| | "batch_size": [r.get("batch_size", 1) for r in records], |
| | "expected_max_latency_ms": [r.get("expected_max_latency_ms", 1000) for r in records], |
| | "model_type": [r.get("model_type", "microsoft/resnet-18") for r in records], |
| | "comparison_group": [r.get("comparison_group", 0) for r in records] |
| | }) |
| |
|
| | output_path = self.output_dir / f"{dataset['name']}.parquet" |
| | pq.write_table(table, output_path) |
| |
|
| | |
| | metadata = { |
| | "name": dataset["name"], |
| | "category": dataset["category"], |
| | "description": dataset["description"], |
| | "record_count": len(records), |
| | "file_size_mb": round(output_path.stat().st_size / (1024 * 1024), 2), |
| | "schema": [field.name for field in table.schema] |
| | } |
| |
|
| | metadata_path = self.output_dir / f"{dataset['name']}_metadata.json" |
| | with open(metadata_path, 'w') as f: |
| | json.dump(metadata, f, indent=2) |
| |
|
| | def generate_all_datasets(self): |
| | """Generate all 100 datasets.""" |
| | print(" Starting dataset generation...") |
| |
|
| | print("📊 Generating standard test datasets (25)...") |
| | standard_datasets = self.generate_standard_datasets(25) |
| | for dataset in standard_datasets: |
| | self.save_dataset_to_parquet(dataset) |
| |
|
| | print("⚡ Generating edge case datasets (25)...") |
| | edge_datasets = self.generate_edge_case_datasets(25) |
| | for dataset in edge_datasets: |
| | self.save_dataset_to_parquet(dataset) |
| |
|
| | print("🏁 Generating performance datasets (25)...") |
| | performance_datasets = self.generate_performance_datasets(25) |
| | for dataset in performance_datasets: |
| | self.save_dataset_to_parquet(dataset) |
| |
|
| | print("🔄 Generating model comparison datasets (25)...") |
| | comparison_datasets = self.generate_model_comparison_datasets(25) |
| | for dataset in comparison_datasets: |
| | self.save_dataset_to_parquet(dataset) |
| |
|
| | print(f"✅ Generated 100 datasets in {self.output_dir}/") |
| |
|
| | self.generate_summary() |
| |
|
| | def generate_summary(self): |
| | """Generate a summary of all datasets.""" |
| | summary = { |
| | "total_datasets": 100, |
| | "categories": { |
| | "standard": 25, |
| | "edge_case": 25, |
| | "performance": 25, |
| | "model_comparison": 25 |
| | }, |
| | "dataset_info": [], |
| | "usage_instructions": { |
| | "loading": "Use pyarrow.parquet.read_table('dataset.parquet')", |
| | "testing": "Run python scripts/test_datasets.py", |
| | "api_endpoint": "POST /predict/resnet", |
| | "request_format": "See api_request column in datasets" |
| | } |
| | } |
| |
|
| | |
| | for parquet_file in self.output_dir.glob("*.parquet"): |
| | metadata_file = self.output_dir / f"{parquet_file.stem}_metadata.json" |
| | if metadata_file.exists(): |
| | with open(metadata_file, 'r') as f: |
| | metadata = json.load(f) |
| | summary["dataset_info"].append(metadata) |
| |
|
| | summary_path = self.output_dir / "datasets_summary.json" |
| | with open(summary_path, 'w') as f: |
| | json.dump(summary, f, indent=2) |
| |
|
| | print(f"📋 Summary saved to {summary_path}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | generator = TestDatasetGenerator() |
| | generator.generate_all_datasets() |