#!/bin/bash
# StreamSocial Kafka Cluster Setup - Day 2
# Complete implementation with 3-broker cluster, monitoring, and testing
set -e # Exit on any error
echo "🚀 StreamSocial Kafka Cluster Setup - Day 2"
echo "=============================================="
# Create project structure
echo "📁 Creating project structure..."
mkdir -p streamsocial-kafka/{src,tests,docker,frontend,monitoring,scripts}
cd streamsocial-kafka
# Create start.sh
cat > start.sh << 'EOF'
#!/bin/bash
set -e
echo "🔧 Setting up StreamSocial Kafka Environment..."
# Create and activate virtual environment
if [ ! -d "venv" ]; then
python3.11 -m venv venv
fi
source venv/bin/activate
# Install dependencies
pip install --upgrade pip
pip install kafka-python flask flask-cors requests docker-compose
echo "🐳 Starting Docker Compose services..."
docker-compose -f docker/docker-compose.yml up -d
# Wait for services to be ready
echo "⏳ Waiting for Kafka cluster to be ready..."
sleep 30
# Run tests
echo "🧪 Running cluster tests..."
python tests/test_cluster.py
# Start monitoring dashboard
echo "📊 Starting monitoring dashboard..."
python src/monitoring_dashboard.py &
MONITOR_PID=$!
# Start frontend
echo "🌐 Starting frontend..."
cd frontend && python -m http.server 3000 &
FRONTEND_PID=$!
echo "✅ StreamSocial Kafka Cluster is running!"
echo "📊 Monitoring Dashboard: http://localhost:5000"
echo "🌐 Frontend Dashboard: http://localhost:3000"
echo "🔗 Kafka Brokers: localhost:9092, localhost:9093, localhost:9094"
# Store PIDs for cleanup
echo $MONITOR_PID > .monitor.pid
echo $FRONTEND_PID > .frontend.pid
echo "Press Ctrl+C to stop all services"
trap 'bash stop.sh' INT
wait
EOF
# Create stop.sh
cat > stop.sh << 'EOF'
#!/bin/bash
echo "🛑 Stopping StreamSocial Kafka services..."
# Kill monitoring and frontend processes
if [ -f .monitor.pid ]; then
kill $(cat .monitor.pid) 2>/dev/null || true
rm .monitor.pid
fi
if [ -f .frontend.pid ]; then
kill $(cat .frontend.pid) 2>/dev/null || true
rm .frontend.pid
fi
# Stop Docker services
docker-compose -f docker/docker-compose.yml down
# Deactivate virtual environment
deactivate 2>/dev/null || true
echo "✅ All services stopped"
EOF
chmod +x start.sh stop.sh
# Create Docker Compose configuration
mkdir -p docker
cat > docker/docker-compose.yml << 'EOF'
version: '3.8'
services:
# Kafka Broker 1
kafka-1:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka-1
container_name: kafka-1
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://localhost:9092'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 1
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka-1:29092,CONTROLLER://kafka-1:29093,PLAINTEXT_HOST://0.0.0.0:9092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://localhost:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka-1:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 3
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: anonymous
volumes:
- ./update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo "#!/bin/bash" > /tmp/update_run.sh; fi; /tmp/update_run.sh && /etc/confluent/docker/run'"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9092"]
interval: 30s
timeout: 10s
retries: 5
# Kafka Broker 2
kafka-2:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka-2
container_name: kafka-2
ports:
- "9093:9093"
- "9102:9102"
environment:
KAFKA_BROKER_ID: 2
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://localhost:9093'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 2
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka-2:29092,CONTROLLER://kafka-2:29093,PLAINTEXT_HOST://0.0.0.0:9093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_JMX_PORT: 9102
KAFKA_JMX_HOSTNAME: localhost
volumes:
- ./update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo "#!/bin/bash" > /tmp/update_run.sh; fi; /tmp/update_run.sh && /etc/confluent/docker/run'"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9093"]
interval: 30s
timeout: 10s
retries: 5
# Kafka Broker 3
kafka-3:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka-3
container_name: kafka-3
ports:
- "9094:9094"
- "9103:9103"
environment:
KAFKA_BROKER_ID: 3
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://kafka-3:29092,PLAINTEXT_HOST://localhost:9094'
KAFKA_PROCESS_ROLES: 'broker,controller'
KAFKA_NODE_ID: 3
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093'
KAFKA_LISTENERS: 'PLAINTEXT://kafka-3:29092,CONTROLLER://kafka-3:29093,PLAINTEXT_HOST://0.0.0.0:9094'
KAFKA_INTER_BROKER_LISTENER_NAME: 'PLAINTEXT'
KAFKA_CONTROLLER_LISTENER_NAMES: 'CONTROLLER'
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_JMX_PORT: 9103
KAFKA_JMX_HOSTNAME: localhost
volumes:
- ./update_run.sh:/tmp/update_run.sh
command: "bash -c 'if [ ! -f /tmp/update_run.sh ]; then echo "#!/bin/bash" > /tmp/update_run.sh; fi; /tmp/update_run.sh && /etc/confluent/docker/run'"
healthcheck:
test: ["CMD", "kafka-broker-api-versions", "--bootstrap-server", "localhost:9094"]
interval: 30s
timeout: 10s
retries: 5
networks:
default:
name: streamsocial-network
EOF
# Create update_run.sh for Docker containers
cat > docker/update_run.sh << 'EOF'
#!/bin/bash
# Kafka KRaft setup script
KAFKA_CLUSTER_ID="$(kafka-storage random-uuid)"
kafka-storage format -t $KAFKA_CLUSTER_ID -c /etc/kafka/kafka.properties
EOF
# Create main Kafka client library
cat > src/kafka_client.py << 'EOF'
"""
StreamSocial Kafka Client - Day 2
Handles broker connections, failover, and cluster monitoring
"""
import json
import time
import random
from kafka import KafkaProducer, KafkaConsumer, KafkaClient
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
from kafka.errors import KafkaError
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StreamSocialKafkaClient:
def __init__(self, bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094']):
self.bootstrap_servers = bootstrap_servers
self.producer = None
self.admin_client = None
self._init_clients()
def _init_clients(self):
"""Initialize Kafka producer and admin client with retry logic"""
max_retries = 5
for attempt in range(max_retries):
try:
self.producer = KafkaProducer(
bootstrap_servers=self.bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8'),
retries=3,
acks='all', # Wait for all replicas
compression_type='gzip'
)
self.admin_client = KafkaAdminClient(
bootstrap_servers=self.bootstrap_servers,
client_id='streamsocial-admin'
)
logger.info("✅ Kafka clients initialized successfully")
return
except Exception as e:
logger.warning(f"Attempt {attempt + 1}/{max_retries} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
raise
def send_event(self, topic, event_data, user_id=None):
"""Send event to Kafka with automatic partitioning"""
try:
key = user_id if user_id else str(random.randint(1, 1000))
future = self.producer.send(
topic=topic,
key=key,
value=event_data,
timestamp_ms=int(time.time() * 1000)
)
# Wait for confirmation
record_metadata = future.get(timeout=10)
logger.info(f"📤 Event sent to {topic}[{record_metadata.partition}] offset {record_metadata.offset}")
return record_metadata
except KafkaError as e:
logger.error(f"❌ Failed to send event: {e}")
raise
def get_cluster_metadata(self):
"""Get cluster information and broker health"""
try:
client = KafkaClient(bootstrap_servers=self.bootstrap_servers)
metadata = client.cluster
brokers = []
for broker in metadata.brokers():
brokers.append({
'id': broker.nodeId,
'host': broker.host,
'port': broker.port,
'rack': broker.rack
})
topics = list(metadata.topics())
return {
'brokers': brokers,
'topics': topics,
'cluster_id': metadata.cluster_id,
'controller': metadata.controller.nodeId if metadata.controller else None
}
except Exception as e:
logger.error(f"❌ Failed to get cluster metadata: {e}")
return None
def create_streamsocial_topics(self):
"""Create StreamSocial topics with proper replication"""
from kafka.admin import NewTopic
topics = [
NewTopic(
name='user-actions',
num_partitions=3,
replication_factor=3
),
NewTopic(
name='content-interactions',
num_partitions=3,
replication_factor=3
),
NewTopic(
name='system-events',
num_partitions=1,
replication_factor=3
)
]
try:
result = self.admin_client.create_topics(topics, timeout_ms=10000)
for topic_name, future in result.items():
try:
future.result()
logger.info(f"✅ Topic '{topic_name}' created successfully")
except Exception as e:
if "TopicExistsException" in str(e):
logger.info(f"ℹ️ Topic '{topic_name}' already exists")
else:
logger.error(f"❌ Failed to create topic '{topic_name}': {e}")
except Exception as e:
logger.error(f"❌ Failed to create topics: {e}")
def simulate_user_activity(self, duration_seconds=60):
"""Simulate StreamSocial user activity for testing"""
logger.info(f"🎭 Starting user activity simulation for {duration_seconds} seconds")
user_actions = ['post', 'like', 'comment', 'share', 'follow']
content_types = ['photo', 'video', 'text', 'story']
start_time = time.time()
event_count = 0
try:
while time.time() - start_time < duration_seconds:
# Generate user action event
user_id = random.randint(1, 10000)
action = random.choice(user_actions)
user_event = {
'event_type': 'user_action',
'user_id': user_id,
'action': action,
'content_type': random.choice(content_types),
'timestamp': int(time.time() * 1000),
'session_id': f"session_{user_id}_{int(time.time())}"
}
self.send_event('user-actions', user_event, user_id)
event_count += 1
# Generate content interaction event
if random.random() < 0.7: # 70% chance
interaction_event = {
'event_type': 'content_interaction',
'user_id': user_id,
'content_id': random.randint(1, 1000),
'interaction_type': random.choice(['view', 'click', 'scroll']),
'duration_ms': random.randint(1000, 30000),
'timestamp': int(time.time() * 1000)
}
self.send_event('content-interactions', interaction_event, user_id)
event_count += 1
time.sleep(random.uniform(0.1, 0.5)) # Random delay
except KeyboardInterrupt:
logger.info("🛑 Simulation stopped by user")
logger.info(f"📊 Simulation complete: {event_count} events generated")
return event_count
if __name__ == "__main__":
# Demo the client
client = StreamSocialKafkaClient()
# Create topics
client.create_streamsocial_topics()
# Get cluster info
metadata = client.get_cluster_metadata()
if metadata:
print(f"🏢 Cluster ID: {metadata['cluster_id']}")
print(f"👑 Controller: Broker {metadata['controller']}")
print(f"🖥️ Brokers: {len(metadata['brokers'])}")
for broker in metadata['brokers']:
print(f" - Broker {broker['id']}: {broker['host']}:{broker['port']}")
# Run simulation
client.simulate_user_activity(30)
EOF
# Create monitoring dashboard
cat > src/monitoring_dashboard.py << 'EOF'
"""
StreamSocial Kafka Monitoring Dashboard - Day 2
Real-time cluster health and metrics visualization
"""
from flask import Flask, render_template, jsonify
from flask_cors import CORS
import json
import time
import threading
from kafka import KafkaConsumer
from kafka.admin import KafkaAdminClient
from src.kafka_client import StreamSocialKafkaClient
app = Flask(__name__)
CORS(app)
class KafkaMonitor:
def __init__(self):
self.client = StreamSocialKafkaClient()
self.metrics = {
'cluster_health': 'unknown',
'total_events': 0,
'events_per_second': 0,
'broker_count': 0,
'topics': [],
'recent_events': []
}
self.running = True
self.start_monitoring()
def start_monitoring(self):
"""Start background monitoring threads"""
threading.Thread(target=self._monitor_cluster, daemon=True).start()
threading.Thread(target=self._monitor_events, daemon=True).start()
def _monitor_cluster(self):
"""Monitor cluster health and metadata"""
while self.running:
try:
metadata = self.client.get_cluster_metadata()
if metadata:
self.metrics['cluster_health'] = 'healthy'
self.metrics['broker_count'] = len(metadata['brokers'])
self.metrics['topics'] = metadata['topics']
else:
self.metrics['cluster_health'] = 'unhealthy'
except Exception as e:
self.metrics['cluster_health'] = 'error'
print(f"Monitoring error: {e}")
time.sleep(10) # Check every 10 seconds
def _monitor_events(self):
"""Monitor event throughput"""
try:
consumer = KafkaConsumer(
'user-actions',
'content-interactions',
'system-events',
bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=1000,
auto_offset_reset='latest'
)
events_in_window = []
window_size = 60 # 1 minute window
for message in consumer:
current_time = time.time()
event_data = {
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'value': message.value,
'timestamp': current_time
}
# Add to recent events (keep last 10)
self.metrics['recent_events'].append(event_data)
if len(self.metrics['recent_events']) > 10:
self.metrics['recent_events'].pop(0)
# Track events for throughput calculation
events_in_window.append(current_time)
# Remove events older than window
events_in_window = [t for t in events_in_window if current_time - t < window_size]
# Update metrics
self.metrics['total_events'] += 1
self.metrics['events_per_second'] = len(events_in_window) / window_size
except Exception as e:
print(f"Event monitoring error: {e}")
monitor = KafkaMonitor()
@app.route('/')
def dashboard():
"""Main dashboard page"""
return """
StreamSocial Kafka Monitor
🚀 StreamSocial Kafka Monitor
Real-time cluster health and event tracking
🔄 Refresh Dashboard
Cluster Health
Loading...
Active Brokers
0
Total Events
0
Events/Second
0.0
📊 Recent Events
Loading events...
"""
@app.route('/api/metrics')
def get_metrics():
"""API endpoint for metrics data"""
return jsonify(monitor.metrics)
@app.route('/api/cluster/test')
def test_cluster():
"""Test cluster connectivity and send sample events"""
try:
# Send test events
client = StreamSocialKafkaClient()
test_event = {
'event_type': 'test',
'message': 'Dashboard connectivity test',
'timestamp': int(time.time() * 1000)
}
client.send_event('system-events', test_event)
return jsonify({
'status': 'success',
'message': 'Test event sent successfully'
})
except Exception as e:
return jsonify({
'status': 'error',
'message': str(e)
}), 500
if __name__ == '__main__':
print("🖥️ Starting StreamSocial Kafka Monitor...")
print("📊 Dashboard: http://localhost:5000")
app.run(host='0.0.0.0', port=5000, debug=False)
EOF
# Create test suite
cat > tests/test_cluster.py << 'EOF'
"""
StreamSocial Kafka Cluster Tests - Day 2
Comprehensive testing for 3-broker cluster setup
"""
import time
import json
import unittest
import threading
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient
from src.kafka_client import StreamSocialKafkaClient
class TestKafkaCluster(unittest.TestCase):
def setUp(self):
"""Set up test environment"""
self.client = StreamSocialKafkaClient()
self.client.create_streamsocial_topics()
time.sleep(5) # Wait for topics to be created
def test_cluster_connectivity(self):
"""Test basic cluster connectivity"""
print("🔍 Testing cluster connectivity...")
metadata = self.client.get_cluster_metadata()
self.assertIsNotNone(metadata, "Should be able to get cluster metadata")
self.assertEqual(len(metadata['brokers']), 3, "Should have 3 brokers")
self.assertIsNotNone(metadata['controller'], "Should have a controller")
print(f"✅ Connected to {len(metadata['brokers'])} brokers")
print(f"👑 Controller: Broker {metadata['controller']}")
def test_topic_creation(self):
"""Test topic creation and configuration"""
print("🔍 Testing topic creation...")
metadata = self.client.get_cluster_metadata()
expected_topics = {'user-actions', 'content-interactions', 'system-events'}
actual_topics = set(metadata['topics'])
self.assertTrue(expected_topics.issubset(actual_topics),
f"Expected topics {expected_topics} not found in {actual_topics}")
print("✅ All required topics created successfully")
def test_producer_functionality(self):
"""Test message production to all brokers"""
print("🔍 Testing producer functionality...")
test_events = [
('user-actions', {'user_id': 123, 'action': 'test'}),
('content-interactions', {'content_id': 456, 'interaction': 'test'}),
('system-events', {'event': 'test_system_event'})
]
for topic, event_data in test_events:
metadata = self.client.send_event(topic, event_data)
self.assertIsNotNone(metadata, f"Should successfully send to {topic}")
print(f"✅ Sent test event to {topic}[{metadata.partition}]")
def test_consumer_functionality(self):
"""Test message consumption from cluster"""
print("🔍 Testing consumer functionality...")
# First, send a test message
test_message = {'test': 'consumer_test', 'timestamp': int(time.time())}
self.client.send_event('user-actions', test_message)
# Then consume it
consumer = KafkaConsumer(
'user-actions',
bootstrap_servers=['localhost:9092', 'localhost:9093', 'localhost:9094'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
consumer_timeout_ms=10000,
auto_offset_reset='latest'
)
# Send another message to trigger consumption
self.client.send_event('user-actions', test_message)
message_received = False
for message in consumer:
if message.value.get('test') == 'consumer_test':
message_received = True
break
consumer.close()
self.assertTrue(message_received, "Should receive the test message")
print("✅ Successfully consumed test message")
def test_fault_tolerance(self):
"""Test cluster behavior during broker failure simulation"""
print("🔍 Testing fault tolerance (simulated)...")
# Test with different bootstrap servers to simulate partial failures
test_configs = [
['localhost:9092', 'localhost:9093'], # Broker 3 "failed"
['localhost:9092', 'localhost:9094'], # Broker 2 "failed"
['localhost:9093', 'localhost:9094'] # Broker 1 "failed"
]
for i, servers in enumerate(test_configs):
try:
test_client = StreamSocialKafkaClient(servers)
metadata = test_client.get_cluster_metadata()
if metadata and len(metadata['brokers']) >= 2:
test_event = {'test': f'failover_test_{i}', 'config': i}
test_client.send_event('system-events', test_event)
print(f"✅ Failover test {i+1} passed with {len(metadata['brokers'])} brokers")
else:
print(f"⚠️ Failover test {i+1}: Limited broker availability")
except Exception as e:
print(f"⚠️ Failover test {i+1} error: {e}")
def test_throughput_performance(self):
"""Test cluster throughput under load"""
print("🔍 Testing throughput performance...")
start_time = time.time()
event_count = 100
for i in range(event_count):
event_data = {
'event_id': i,
'user_id': i % 10,
'action': 'performance_test',
'timestamp': int(time.time() * 1000)
}
self.client.send_event('user-actions', event_data, user_id=i % 10)
duration = time.time() - start_time
throughput = event_count / duration
print(f"✅ Throughput test: {event_count} events in {duration:.2f}s ({throughput:.2f} events/sec)")
self.assertGreater(throughput, 10, "Should achieve at least 10 events/sec")
def run_continuous_test():
"""Run continuous monitoring test"""
print("🔄 Starting continuous cluster monitoring...")
client = StreamSocialKafkaClient()
test_count = 0
try:
while test_count < 10: # Run 10 iterations
# Check cluster health
metadata = client.get_cluster_metadata()
if metadata:
broker_count = len(metadata['brokers'])
print(f"📊 Iteration {test_count + 1}: {broker_count} brokers active")
# Send test event
test_event = {
'test_iteration': test_count,
'timestamp': int(time.time() * 1000),
'cluster_health': 'monitoring'
}
try:
client.send_event('system-events', test_event)
print(f"✅ Test event {test_count + 1} sent successfully")
except Exception as e:
print(f"❌ Test event {test_count + 1} failed: {e}")
else:
print(f"⚠️ Iteration {test_count + 1}: Cluster metadata unavailable")
test_count += 1
time.sleep(3)
except KeyboardInterrupt:
print("🛑 Continuous test stopped by user")
if __name__ == '__main__':
print("🧪 StreamSocial Kafka Cluster Test Suite")
print("=" * 50)
# Run unit tests
unittest.main(argv=[''], exit=False, verbosity=2)
print("n" + "=" * 50)
# Run continuous monitoring
run_continuous_test()
print("n🎉 All tests completed!")
EOF
# Create frontend dashboard
mkdir -p frontend
cat > frontend/index.html << 'EOF'
StreamSocial Kafka Dashboard
🚀 StreamSocial
Features
Status
Monitoring
Kafka Cluster Dashboard
Real-time monitoring and management for StreamSocial's distributed event streaming platform
📊 Open Monitor
🔧 Run Diagnostics
Kafka Mastery Features
🏢
3-Broker Cluster
High-availability cluster with automatic failover and data replication across multiple brokers.
⚡
Real-time Processing
Handle millions of social media events with sub-millisecond latency and guaranteed delivery.
📊
Live Monitoring
Comprehensive dashboard with cluster health, throughput metrics, and event tracking.
🔒
Fault Tolerance
Built-in redundancy ensures your data is safe even when brokers fail unexpectedly.
🔄
Event Replay
Replay historical events for analytics, debugging, and disaster recovery scenarios.
📈
Horizontal Scaling
Seamlessly add more brokers and partitions as your user base grows from thousands to millions.
Cluster Status
Loading...
Active Brokers
Loading...
Health Status
Loading...
Events Processed
Loading...
Events/Second
© 2025 StreamSocial Kafka Mastery Course - Day 2: Cluster Setup
Building the next generation of event-driven systems
EOF
# Create requirements.txt
cat > requirements.txt << 'EOF'
kafka-python==2.0.2
flask==3.0.0
flask-cors==4.0.0
requests==2.31.0
docker-compose==1.29.2
EOF
# Make scripts executable
chmod +x start.sh stop.sh
echo "✅ StreamSocial Kafka Cluster implementation created successfully!"
echo ""
echo "📁 Project Structure:"
echo "streamsocial-kafka/"
echo "├── start.sh # Start all services"
echo "├── stop.sh # Stop all services"
echo "├── src/ # Source code"
echo "│ ├── kafka_client.py # Kafka client library"
echo "│ └── monitoring_dashboard.py # Web dashboard"
echo "├── tests/ # Test suite"
echo "│ └── test_cluster.py # Cluster tests"
echo "├── docker/ # Docker configuration"
echo "│ └── docker-compose.yml # 3-broker cluster"
echo "├── frontend/ # Dashboard UI"
echo "│ └── index.html # Main dashboard"
echo "└── requirements.txt # Python dependencies"
echo ""
echo "🚀 To start the cluster:"
echo " cd streamsocial-kafka"
echo " ./start.sh"
echo ""
echo "🛑 To stop the cluster:"
echo " ./stop.sh"
echo ""
echo "📊 Access points:"
echo " - Monitoring Dashboard: http://localhost:5000"
echo " - Frontend Dashboard: http://localhost:3000"
echo " - Kafka Brokers: localhost:9092, 9093, 9094"