Examples
Real-world examples and use cases for pbar.io in various scenarios.
Machine Learning Training
Track model training progress across epochs, batches, and validation steps.
import torchfrom pbar_io import tqdm, ProgressBar
# Create parent bar for overall trainingtraining_bar = ProgressBar( title=f"Training {model_name}", total=num_epochs, metadata={"model": model_name, "dataset": dataset_name})
print(f"📊 Track training: {training_bar.url}")
for epoch in range(num_epochs): # Create child bar for this epoch epoch_bar = ProgressBar( title=f"Epoch {epoch+1}/{num_epochs}", total=len(train_loader), parent_slug=training_bar.slug ) # Training loop with batch progress for batch_idx, (data, target) in enumerate(tqdm( train_loader, desc=f"Epoch {epoch+1}", leave=False )): optimizer.zero_grad() output = model(data) loss = criterion(output, target) loss.backward() optimizer.step() # Update epoch progress epoch_bar.increment() # Log metrics every N batches if batch_idx % log_interval == 0: epoch_bar.metadata = { "loss": loss.item(), "accuracy": calculate_accuracy(output, target), "learning_rate": optimizer.param_groups[0]['lr'] } # Validation phase val_loss, val_acc = validate(model, val_loader) epoch_bar.metadata = { "val_loss": val_loss, "val_accuracy": val_acc, "status": "completed" } epoch_bar.complete() # Update overall training progress training_bar.increment() training_bar.metadata = { "best_val_acc": best_val_acc, "current_epoch": epoch + 1 }
training_bar.complete()print(f"✅ Training complete! Results: {training_bar.url}")
Data Processing Pipeline
Monitor multi-stage ETL pipelines with hierarchical progress bars.
from pbar_io import ProgressBarimport pandas as pdfrom concurrent.futures import ThreadPoolExecutor
# Main pipeline progresspipeline = ProgressBar( title="Data Pipeline - Q4 2024", total=4, # 4 stages metadata={"start_time": datetime.now().isoformat()})
print(f"Pipeline progress: {pipeline.url}")
# Stage 1: Extractextract_bar = ProgressBar( title="Extracting data from sources", total=len(data_sources), parent_slug=pipeline.slug)
extracted_data = []for source in data_sources: data = extract_from_source(source) extracted_data.append(data) extract_bar.increment() extract_bar.complete()pipeline.increment()
# Stage 2: Transform with parallel processingtransform_bar = ProgressBar( title="Transforming records", total=sum(len(df) for df in extracted_data), parent_slug=pipeline.slug)
def transform_batch(records): transformed = [] for record in records: transformed.append(apply_transformations(record)) transform_bar.increment() return transformed
with ThreadPoolExecutor(max_workers=4) as executor: all_records = pd.concat(extracted_data) batches = np.array_split(all_records, 4) futures = [executor.submit(transform_batch, batch) for batch in batches] transformed_data = [] for future in futures: transformed_data.extend(future.result())
transform_bar.complete()pipeline.increment()
# Stage 3: Validatevalidate_bar = ProgressBar( title="Validating data quality", total=len(validation_rules), parent_slug=pipeline.slug)
validation_errors = []for rule in validation_rules: errors = rule.validate(transformed_data) validation_errors.extend(errors) validate_bar.increment() validate_bar.metadata = { "errors_found": len(validation_errors), "current_rule": rule.name }
validate_bar.complete()pipeline.increment()
# Stage 4: Loadload_bar = ProgressBar( title="Loading to data warehouse", total=len(transformed_data), parent_slug=pipeline.slug, unit="records")
# Batch insert with progressbatch_size = 1000for i in range(0, len(transformed_data), batch_size): batch = transformed_data[i:i+batch_size] warehouse.insert_batch(batch) load_bar.update(current=min(i+batch_size, len(transformed_data)))
# load_bar auto-completes when current >= total
pipeline.increment()pipeline.metadata = { "total_records": len(transformed_data), "validation_errors": len(validation_errors), "completion_time": datetime.now().isoformat()}# Pipeline auto-completes when current >= total
Tips and Tricks
💡 Hierarchical Progress
Use parent-child relationships to automatically aggregate progress across subtasks. Parents show combined progress of all children.
📊 Rich Metadata
Store any JSON-serializable data in metadata. Great for logging metrics, errors, or status information.
🔄 Resume on Failure
Store progress in metadata to resume from where you left off after failures or interruptions.
âš¡ Batch Operations
Group multiple small updates into batches to reduce API calls and improve performance.