Kafka
End-to-end guide for ingesting logs from Apache Kafka topics
Kafka
This guide walks through ingesting logs from Apache Kafka into nano. Kafka is ideal for high-volume streaming pipelines where logs are already flowing through a Kafka cluster — application logs, security events from SIEM forwarders, change data capture streams, or any structured data published to Kafka topics.
nano's Vector pipeline acts as a Kafka consumer, pulling messages from one or more topics in a consumer group.
Prerequisites
- A running Kafka cluster (self-managed, Confluent Cloud, Amazon MSK, Redpanda, etc.)
- Network connectivity from nano to the Kafka bootstrap servers
- A running nano instance
Step 1: Prepare Your Kafka Topic
You likely already have topics with log data. If you're setting up a new topic for nano:
# Create a topic (adjust replication and partitions for your cluster)
kafka-topics.sh --create \
--bootstrap-server kafka-1:9092 \
--topic security-events \
--partitions 6 \
--replication-factor 3 \
--config retention.ms=86400000Topic Design Considerations
| Approach | Example | Pros | Cons |
|---|---|---|---|
| One topic per log type | cloudtrail-events, okta-events | Simple routing, independent retention | More topics to manage |
| Shared topic with headers/keys | security-events with key = source type | Fewer topics | Requires routing rules in nano |
nano supports both approaches. With separate topics, each nano log source points at its own topic. With a shared topic, you use routing rules to direct messages to the right parser based on the topic name or message content.
Verify Your Topic Has Data
# Check topic exists and has partitions
kafka-topics.sh --describe \
--bootstrap-server kafka-1:9092 \
--topic security-events
# Peek at recent messages
kafka-console-consumer.sh \
--bootstrap-server kafka-1:9092 \
--topic security-events \
--from-beginning --max-messages 3Step 2: Create Kafka Credentials (If Required)
If your Kafka cluster uses SASL authentication, create a dedicated user for nano. If your cluster is unauthenticated (common in development or VPC-internal setups), skip to Step 3.
Confluent Cloud
# Create a service account
confluent iam service-account create nanosiem-reader \
--description "nano log consumer"
# Create an API key for the service account
confluent api-key create \
--service-account sa-xxxxx \
--resource lkc-xxxxxGrant the service account read access to your topics:
# Grant consumer group and topic read ACLs
confluent kafka acl create --allow \
--service-account sa-xxxxx \
--operations read \
--topic security-events
confluent kafka acl create --allow \
--service-account sa-xxxxx \
--operations read \
--consumer-group nanosiemAmazon MSK
For MSK with IAM authentication, use SASL/SCRAM credentials stored in AWS Secrets Manager:
# Create a secret for SCRAM credentials
aws secretsmanager create-secret \
--name AmazonMSK_nanosiem \
--secret-string '{"username": "nanosiem-reader", "password": "YOUR_SECURE_PASSWORD"}'
# Associate the secret with your MSK cluster
aws kafka batch-associate-scram-secret \
--cluster-arn arn:aws:kafka:us-east-1:ACCOUNT_ID:cluster/my-cluster/abc123 \
--secret-arn-list arn:aws:secretsmanager:us-east-1:ACCOUNT_ID:secret:AmazonMSK_nanosiem-xxxxxxThen create Kafka ACLs for the user:
kafka-acls.sh --bootstrap-server your-msk-broker:9096 \
--command-config client.properties \
--add --allow-principal User:nanosiem-reader \
--operation Read --topic security-events
kafka-acls.sh --bootstrap-server your-msk-broker:9096 \
--command-config client.properties \
--add --allow-principal User:nanosiem-reader \
--operation Read --group nanosiemSelf-Managed Kafka
Create a SCRAM user:
kafka-configs.sh --bootstrap-server kafka-1:9092 \
--alter --add-config 'SCRAM-SHA-256=[password=YOUR_SECURE_PASSWORD]' \
--entity-type users --entity-name nanosiem-readerGrant read ACLs:
kafka-acls.sh --bootstrap-server kafka-1:9092 \
--add --allow-principal User:nanosiem-reader \
--operation Read --topic security-events \
--group nanosiemRequired ACLs Summary
nano needs minimal read-only access:
| Resource | Operation | Purpose |
|---|---|---|
| Topic | Read | Consume messages |
| Consumer Group | Read | Join consumer group, commit offsets |
nano does not need Write, Create, Delete, or Alter permissions.
Step 3: Store Credentials in nano
- Navigate to Settings → Cloud Credentials
- Click Add Credential
- Fill in the form:
| Field | Value |
|---|---|
| Provider | Kafka |
| Name | A descriptive name, e.g. Confluent Cloud Production |
- Configure authentication:
For SASL Authentication
| Field | Value |
|---|---|
| SASL Mechanism | PLAIN, SCRAM-SHA-256, or SCRAM-SHA-512 |
| Username | Your Kafka username or API key |
| Password | Your Kafka password or API secret |
| Enable TLS/SSL | Checked (required for most managed Kafka services) |
For Unauthenticated Kafka
| Field | Value |
|---|---|
| SASL Mechanism | None |
| Enable TLS/SSL | Unchecked (unless your cluster requires TLS without SASL) |
- Click Save
When to enable TLS: Enable TLS/SSL for any Kafka cluster that uses encrypted connections. This includes all managed services (Confluent Cloud, Amazon MSK, Aiven) and any self-managed cluster with ssl.listener.name configured. For clusters on a private network without encryption, you can leave TLS disabled.
Common SASL Mechanism by Provider
| Kafka Provider | SASL Mechanism | TLS |
|---|---|---|
| Confluent Cloud | PLAIN | Yes |
| Amazon MSK (SCRAM) | SCRAM-SHA-512 | Yes |
| Redpanda Cloud | SCRAM-SHA-256 | Yes |
| Aiven | SCRAM-SHA-256 | Yes |
| Self-managed (internal) | Varies or none | Depends |
Step 4: Create a Log Source
- Navigate to Feeds → New Feed (or use the Log Source Wizard)
- Select "I have sample logs" and paste a representative message from your topic (see examples below)
- The AI will detect the format and generate a VRL parser
- Configure the source connection:
| Field | Value |
|---|---|
| Source Type | Kafka |
| Bootstrap Servers | Comma-separated broker addresses, e.g. kafka-1:9092,kafka-2:9092 |
| Topics | One or more topic names, e.g. security-events |
| Consumer Group ID | A group ID for nano, e.g. nanosiem |
| Auto Offset Reset | latest (start from new messages) or earliest (consume all existing messages) |
| Credential | Select the credential from Step 3, or "None" for unauthenticated clusters |
- Set the feed metadata (name, category, vendor, product)
- Publish the parser to create a version and deploy to Vector
Auto Offset Reset: Choose earliest only if you want to ingest historical messages already in the topic. For production use with ongoing ingestion, latest is recommended to avoid reprocessing old data. Once nano has committed offsets for the consumer group, this setting only applies if the group has no existing offsets (first-time consumption or after offset expiration).
Bootstrap Server Formats
| Kafka Provider | Bootstrap Server Format |
|---|---|
| Confluent Cloud | pkc-xxxxx.us-east-1.aws.confluent.cloud:9092 |
| Amazon MSK | b-1.mycluster.abc123.c2.kafka.us-east-1.amazonaws.com:9096 |
| Redpanda Cloud | seed-xxxxx.us-east-1.aws.redpanda.com:9092 |
| Self-managed | kafka-1.internal:9092,kafka-2.internal:9092 |
Sample Messages
JSON Application Logs
{
"timestamp": "2025-01-15T14:23:45.678Z",
"level": "ERROR",
"service": "auth-service",
"message": "Failed login attempt",
"user": "admin@example.com",
"source_ip": "203.0.113.50",
"error_code": "INVALID_CREDENTIALS",
"attempt_count": 5
}Structured Security Events
{
"event_type": "network_connection",
"timestamp": "2025-01-15T14:23:45Z",
"src_ip": "10.0.1.5",
"dst_ip": "198.51.100.10",
"dst_port": 443,
"protocol": "TCP",
"bytes_sent": 1240,
"bytes_recv": 5600,
"command_line": "/usr/bin/curl",
"hostname": "web-server-01"
}CEF (Common Event Format)
CEF:0|SecurityVendor|SecurityProduct|1.0|100|Suspicious Activity|7|src=10.0.1.5 dst=203.0.113.50 dpt=22 act=blocked msg=Brute force SSH attempt detectedStep 5: Verify Ingestion
After publishing, allow a minute for Vector to join the consumer group and start consuming.
Check Feed Health
- Go to Feeds → select your new log source
- On the Overview tab, check:
- Status: Should show "Healthy"
- Event Volume chart: Should show events arriving
- Last Event: Should show a recent timestamp
Search Your Data
Navigate to Search and query for your source type:
source_type="kafka_security_events"Check Consumer Group Status
Verify nano is consuming from the Kafka side:
kafka-consumer-groups.sh --describe \
--bootstrap-server kafka-1:9092 \
--group nanosiemYou should see:
- CURRENT-OFFSET advancing as messages are consumed
- LAG near zero (or decreasing if catching up)
- STATE as
Stable
Check for Errors
If no data appears:
-
Check network connectivity — Can nano reach the bootstrap servers?
# From the nano host nc -zv kafka-1 9092 -
Check Vector logs for connection errors:
docker logs nanosiem-vector 2>&1 | grep -i "kafka\|error\|sasl\|tls" -
Check ingestion errors in nano at System → Ingestion Errors
Multiple Topics
One Log Source per Topic
The simplest approach — create a separate nano log source for each Kafka topic, each with its own parser:
cloudtrail-eventstopic →aws_cloudtraillog sourceokta-eventstopic →okta_ssolog sourceapp-logstopic →my_applog source
All can share the same credential and consumer group.
One Source Configuration with Routing Rules
For a shared topic or when you want centralized management, use a Source Configuration with routing rules:
- Go to Settings → Source Configurations
- Create a Kafka source configuration with your broker details
- Add routing rules that match on topic name:
| Match Field | Match Type | Match Value | Target Source Type |
|---|---|---|---|
topic | exact | cloudtrail-events | aws_cloudtrail |
topic | exact | okta-events | okta_sso |
topic | prefix | app- | application_logs |
topic | default | — | generic_kafka |
Troubleshooting
"Connection refused" or timeout
- Verify the bootstrap server addresses and ports are correct
- Check firewall rules / security groups allow traffic from nano to Kafka
- For managed services, ensure the cluster allows connections from nano's IP range
- MSK: Check that public access is enabled if connecting from outside the VPC
"SASL authentication failed"
- Verify the SASL mechanism matches what your Kafka cluster expects
- Check username and password are correct
- Confluent Cloud: Ensure you're using an API key/secret, not a cloud login
- MSK SCRAM: Verify the secret is associated with the cluster
"SSL handshake failed"
- Ensure TLS is enabled in the nano credential
- If using a self-signed CA, provide the CA certificate via the API:
curl -X POST http://localhost:3000/api/credentials \ -H "Authorization: Bearer $TOKEN" \ -H "Content-Type: application/json" \ -d '{ "name": "Kafka Production", "provider": "kafka", "credentials": { "sasl_mechanism": "SCRAM-SHA-256", "sasl_username": "nanosiem-reader", "sasl_password": "your-password", "tls_enabled": true, "tls_ca_cert": "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----" } }'
Consumer group is not consuming (LAG increasing)
- Check that the consumer group ID in nano doesn't conflict with another consumer
- Verify the topic name is spelled correctly (Kafka topic names are case-sensitive)
- Check Vector resource allocation — high-volume topics may need more CPU/memory
Messages consumed but not appearing in Search
- Check for parse errors in System → Ingestion Errors
- Verify the parser handles your message format — test with sample data in the parser editor
- Ensure the log source is published and deployed
Performance Tuning
Partitions and Parallelism
Vector creates one consumer per log source. For high-throughput topics, increase the number of partitions to allow Kafka to distribute load:
kafka-topics.sh --alter \
--bootstrap-server kafka-1:9092 \
--topic security-events \
--partitions 12Consumer Group Coordination
If you run multiple nano Vector instances (e.g., in a Kubernetes deployment), they can share the same consumer group ID. Kafka will distribute partitions across the instances automatically, giving you horizontal scalability.
Offset Management
nano commits offsets automatically. If you need to reset offsets (e.g., to reprocess data):
# Reset to earliest (reprocess all data)
kafka-consumer-groups.sh --reset-offsets \
--bootstrap-server kafka-1:9092 \
--group nanosiem \
--topic security-events \
--to-earliest \
--executeOnly reset offsets when the consumer group is inactive (nano stopped). Resetting while nano is running will have no effect.
Next Steps
- Create detection rules for your Kafka-sourced logs
- Configure enrichment to add GeoIP and threat intel
- Set up AWS S3/SQS or GCP Pub/Sub for cloud-native log sources