LoggingUtil provides robust integration with popular cloud services for log management and analysis.
from loggingutil import LogFile
from loggingutil.handlers import CloudWatchHandler
logger = LogFile("app.log")
logger.add_handler(CloudWatchHandler(
log_group="/myapp/prod",
log_stream="api-server",
region="us-west-2"
))
export AWS_ACCESS_KEY_ID="your_access_key"
export AWS_SECRET_ACCESS_KEY="your_secret_key"
export AWS_DEFAULT_REGION="us-west-2"
handler = CloudWatchHandler(
log_group="/myapp/prod",
log_stream="api-server",
aws_access_key="your_access_key",
aws_secret_key="your_secret_key",
region="us-west-2"
)
# No credentials needed when using IAM roles
handler = CloudWatchHandler(
log_group="/myapp/prod",
log_stream="api-server"
)
log_group = f"/myapp/{environment}/{component}" # e.g., /myapp/prod/api
log_stream = f”{socket.gethostname()}-{datetime.now().strftime(‘%Y-%m-%d’)}”
3. Error handling:
```python
try:
handler = CloudWatchHandler(...)
logger.add_handler(handler)
except Exception as e:
# Fallback to local logging
logger.log_exception(e)
logger.add_handler(FileRotatingHandler("logs"))
from loggingutil import LogFile
from loggingutil.handlers import ElasticsearchHandler
logger = LogFile("app.log")
logger.add_handler(ElasticsearchHandler(
es_url="http://elasticsearch:9200",
index_prefix="myapp"
))
handler = ElasticsearchHandler(
es_url="https://elasticsearch:9200",
index_prefix="myapp",
auth=("user", "password")
)
handler = ElasticsearchHandler(
es_url="https://elasticsearch:9200",
index_prefix="myapp",
auth=("api_key_id", "api_key")
)
handler = ElasticsearchHandler(
es_url="https://elasticsearch:9200",
index_prefix="myapp-logs" # Results in indices like myapp-logs-2024.03.21
)
class CustomElasticsearchHandler(ElasticsearchHandler): def _get_index_name(self) -> str: env = os.getenv(“ENVIRONMENT”, “dev”) date = datetime.now().strftime(“%Y.%m”) return f”logs-{env}-{date}”
### Best Practices
1. Use index templates:
```json
{
"index_patterns": ["myapp-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"timestamp": { "type": "date" },
"level": { "type": "keyword" },
"correlation_id": { "type": "keyword" },
"message": { "type": "text" }
}
}
}
# Using ILM (Index Lifecycle Management)
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "30d"
}
}
},
"delete": {
"min_age": "90d",
"actions": {
"delete": {}
}
}
}
}
}
You can send logs to multiple cloud services simultaneously:
from loggingutil import LogFile
from loggingutil.handlers import (
CloudWatchHandler,
ElasticsearchHandler,
ConsoleHandler
)
logger = LogFile("app.log")
# Local console output
logger.add_handler(ConsoleHandler(color=True))
# CloudWatch for metrics and alerts
logger.add_handler(CloudWatchHandler(
log_group="/myapp/prod",
log_stream="api-server"
))
# Elasticsearch for search and analysis
logger.add_handler(ElasticsearchHandler(
es_url="http://elasticsearch:9200",
index_prefix="myapp"
))
Implement robust error handling for cloud services:
class ResilientCloudWatchHandler(CloudWatchHandler):
def __init__(self, *args, **kwargs):
self.fallback = FileRotatingHandler("logs/cloudwatch-fallback")
super().__init__(*args, **kwargs)
async def handle(self, log_entry: dict):
try:
await super().handle(log_entry)
except Exception as e:
print(f"CloudWatch error: {e}")
await self.fallback.handle(log_entry)
logger = LogFile(
sampling_rate=0.1 # Only send 10% of logs
)
logger.setLevel(LogLevel.DEBUG)
logger.setLevel(LogLevel.INFO)
3. Use batching for efficiency:
```python
handler = CloudWatchHandler(
log_group="/myapp/prod",
log_stream="api-server",
batch_size=100 # Send logs in batches
)