The Kafka Consumer utility transparently handles message deserialization, provides an intuitive developer experience, and integrates seamlessly with the rest of the Powertools for AWS Lambda ecosystem.
flowchart LR
KafkaTopic["Kafka Topic"] --> MSK["Amazon MSK"]
KafkaTopic --> MSKServerless["Amazon MSK Serverless"]
KafkaTopic --> SelfHosted["Self-hosted Kafka"]
MSK --> EventSourceMapping["Event Source Mapping"]
MSKServerless --> EventSourceMapping
SelfHosted --> EventSourceMapping
EventSourceMapping --> Lambda["Lambda Function"]
Lambda --> KafkaConsumer["Kafka Consumer Utility"]
KafkaConsumer --> Deserialization["Deserialization"]
Deserialization --> YourLogic["Your Business Logic"]
Key features¶
Event Source Mapping (ESM) A Lambda feature that reads from streaming sources (like Kafka) and invokes your Lambda function. It manages polling, batching, and error handling automatically, eliminating the need for consumer management code.
Record Key and Value A Kafka messages contain two important parts: an optional key that determines the partition and a value containing the actual message data. Both are base64-encoded in Lambda events and can be independently deserialized.
Deserialization The process of converting binary data (base64-encoded in Lambda events) into usable Python objects according to a specific format like JSON, Avro, or Protocol Buffers. Powertools handles this conversion automatically.
SchemaConfig class Contains parameters that tell Powertools how to interpret message data, including the format type (JSON, Avro, Protocol Buffers) and optional schema definitions needed for binary formats.
Output parsing A Standard Schema used to parse your data at runtime, allowing you to define how the deserialized data should be structured and validated.
Schema Registry A centralized service that stores and validates schemas, ensuring producers and consumers maintain compatibility when message formats evolve over time.
Moving from traditional Kafka consumers¶Lambda processes Kafka messages as discrete events rather than continuous streams, requiring a different approach to consumer development that Powertools for AWS helps standardize.
Aspect Traditional Kafka Consumers Lambda Kafka Consumer Model Pull-based (you poll for messages) Push-based (Lambda invoked with messages) Scaling Manual scaling configuration Automatic scaling to partition count State Long-running application with state Stateless, ephemeral executions Offsets Manual offset management Automatic offset commitment Schema Validation Client-side schema validation Optional Schema Registry integration with Event Source Mapping Error Handling Per-message retry control Batch-level retry policies Getting started¶ Installation¶Depending on the schema types you want to use, install the library and the corresponding libraries:
JSONAvroProtobuf
npm install @aws-lambda-powertools/kafka
npm install @aws-lambda-powertools/kafka avro-js
npm install @aws-lambda-powertools/kafka protobufjs
Additionally, if you want to use output parsing with Standard Schema, you can install any of the supported libraries, for example: Zod, Valibot, or ArkType.
Required resources¶To use the Kafka consumer utility, you need an AWS Lambda function configured with a Kafka event source. This can be Amazon MSK, MSK Serverless, or a self-hosted Kafka cluster.
gettingStartedWithMsk.yaml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Resources:
KafkaConsumerFunction:
Type: AWS::Serverless::Function
Properties:
Runtime: nodejs22.x
Handler: index.js
Timeout: 30
Events:
MSKEvent:
Type: MSK
Properties:
StartingPosition: LATEST
Stream: !GetAtt MyMSKCluster.Arn
Topics:
- my-topic-1
- my-topic-2
Policies:
- AWSLambdaMSKExecutionRole
Using ESM with Schema Registry¶
The Event Source Mapping configuration determines which mode is used. With JSON
, Lambda converts all messages to JSON before invoking your function. With SOURCE
mode, Lambda preserves the original format, requiring you function to handle the appropriate deserialization.
Powertools for AWS supports both Schema Registry integration modes in your Event Source Mapping configuration.
For simplicity, we will use a simple schema containing name
and age
in most of our examples. You can also copy the payload example with the expected Kafka event to test your code.
JSONPayload JSONAvro SchemaPayload AvroProtobuf SchemaPayload Protobuf
{
"name": "...",
"age": "..."
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records": {
"python-with-avro-doc-5": [
{
"topic": "python-with-avro-doc",
"partition": 5,
"offset": 0,
"timestamp": 1750547462087,
"timestampType": "CREATE_TIME",
"key": "MTIz",
"value": "eyJuYW1lIjogIlBvd2VydG9vbHMiLCAiYWdlIjogNX0=",
"headers": []
}
]
}
}
{
"type": "record",
"name": "User",
"namespace": "com.example",
"fields": [
{"name": "name", "type": "string"},
{"name": "age", "type": "int"}
]
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:eu-west-3:123456789012:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records": {
"python-with-avro-doc-3": [
{
"topic": "python-with-avro-doc",
"partition": 3,
"offset": 0,
"timestamp": 1750547105187,
"timestampType": "CREATE_TIME",
"key": "MTIz",
"value": "AwBXT2qalUhN6oaj2CwEeaEWFFBvd2VydG9vbHMK",
"headers": []
}
]
}
}
syntax = "proto3";
package com.example;
message User {
string name = 1;
int32 age = 2;
}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
{
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:eu-west-3:992382490249:cluster/powertools-kafka-esm/f138df86-9253-4d2a-b682-19e132396d4f-s3",
"bootstrapServers": "boot-z3majaui.c3.kafka-serverless.eu-west-3.amazonaws.com:9098",
"records": {
"python-with-avro-doc-5": [
{
"topic": "python-with-avro-doc",
"partition": 5,
"offset": 1,
"timestamp": 1750624373324,
"timestampType": "CREATE_TIME",
"key": "MTIz",
"value": "Cgpwb3dlcnRvb2xzEAU=",
"headers": []
}
]
}
}
Processing Kafka events¶
The Kafka consumer utility transforms raw Kafka events into an intuitive format for processing. To handle messages effectively, you'll need to configure a schema that matches your data format.
Using Avro is recommendedWe recommend Avro for production Kafka implementations due to its schema evolution capabilities, compact binary format, and integration with Schema Registry. This offers better type safety and forward/backward compatibility compared to JSON.
Avro MessagesProtocol BuffersJSON Messages
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import { readFileSync } from 'node:fs';
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const schemaConfig = {
value: {
type: SchemaType.AVRO,
schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'),
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer(async (event, _context) => {
for (const { value } of event.records) {
logger.info('received value', { value });
}
}, schemaConfig);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { com } from './samples/user.generated.js'; // protobuf generated class
const logger = new Logger({ serviceName: 'kafka-consumer' });
const schemaConfig = {
value: {
type: SchemaType.PROTOBUF,
schema: com.example.User,
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer(async (event, _context) => {
for (const { value } of event.records) {
logger.info('received value', { value });
}
}, schemaConfig);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const schemaConfig = {
value: {
type: SchemaType.JSON,
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer(async (event, _context) => {
for (const { value } of event.records) {
logger.info('received value', { value });
}
}, schemaConfig);
Deserializing key and value¶
The kafkaConsumer
function can deserialize both key and value independently based on your schema configuration. This flexibility allows you to work with different data formats in the same message.
index.tstypes.tsProductKey.avscProductInfo.avsc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
import { readFileSync } from 'node:fs';
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const schemaConfig = {
key: {
type: SchemaType.AVRO,
schema: readFileSync(new URL('./ProductKey.avsc', import.meta.url), 'utf8'),
},
value: {
type: SchemaType.AVRO,
schema: readFileSync(
new URL('./productInfo.avsc', import.meta.url),
'utf8'
),
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer<ProductKey, ProductInfo>(
async (event, _context) => {
for (const { key, value } of event.records) {
logger.info('processing product ID', { productId: key.productId });
logger.info('product', { name: value.name, price: value.price });
}
},
schemaConfig
);
type ProductKey = {
productId: string;
};
type ProductInfo = {
name: string;
price: number;
inStock: boolean;
};
{
"type": "record",
"name": "ProductKey",
"fields": [
{"name": "product_id", "type": "string"}
]
}
{
"type": "record",
"name": "ProductInfo",
"fields": [
{"name": "name", "type": "string"},
{"name": "price", "type": "double"},
{"name": "in_stock", "type": "boolean"}
]
}
You can configure the kafkaConsumer
to handle only the value. This allows you to optimize your Lambda function for the specific data structure of your Kafka messages.
When working with primitive data types (strings, integers, etc.) rather than structured objects, you can simplify your configuration by omitting the schema specification for that component. Powertools for AWS will deserialize the value always as a string.
Common pattern: Keys with primitive valuesUsing primitive types (strings, integers) as Kafka message keys is a common pattern for partitioning and identifying messages. The Kafka consumer automatically handles these primitive keys without requiring special configuration, making it easy to implement this popular design pattern.
Primitive key
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import { Logger } from '@aws-lambda-powertools/logger';
const logger = new Logger({ serviceName: 'kafka-consumer' });
export const handler = kafkaConsumer<string, { name: string; age: number }>(
async (event, _context) => {
for (const record of event.records) {
// Key is automatically decoded as UTF-8 string
const { key } = record;
// Value is parsed as JSON object
const { value } = record;
logger.info('received value', {
key,
product: {
name: value.name,
age: value.age,
},
});
}
}
);
Message format support and comparison¶
The Kafka consumer utility supports multiple serialization formats to match your existing Kafka implementation. Choose the format that best suits your needs based on performance, schema evolution requirements, and ecosystem compatibility.
Selecting the right formatFor new applications, consider Avro or Protocol Buffers over JSON. Both provide schema validation, evolution support, and significantly better performance with smaller message sizes. Avro is particularly well-suited for Kafka due to its built-in schema evolution capabilities.
Supported FormatsFormat Comparison
Format Schema Type Description Required Parameters JSON"JSON"
Human-readable text format None Avro "AVRO"
Compact binary format with schema value.schema
(Avro schema string) Protocol Buffers "PROTOBUF"
Efficient binary format value.schema
(Proto message class) Feature JSON Avro Protocol Buffers Schema Definition Optional Required JSON schema Required Protobuf class Schema Evolution None Strong support Strong support Size Efficiency Low High Highest Processing Speed Slower Fast Fastest Human Readability High Low Low Implementation Complexity Low Medium Medium Additional Dependencies None avro-js
module protobufjs
module
Choose the serialization format that best fits your needs:
Each Kafka record contains important metadata that you can access alongside the deserialized message content. This metadata helps with message processing, troubleshooting, and implementing advanced patterns like exactly-once processing.
Working with Record Metadata
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import { Logger } from '@aws-lambda-powertools/logger';
import { com } from './samples/user.generated.js'; // protobuf generated class
const logger = new Logger({ serviceName: 'kafka-consumer' });
export const handler = kafkaConsumer<unknown, com.example.IUser>(
async (event, _context) => {
for (const record of event.records) {
const { value, topic, partition, offset, timestamp, headers } = record;
logger.info(`processing message from topic ${topic}`, {
partition,
offset,
timestamp,
});
if (headers) {
for (const header of headers) {
logger.debug(`Header: ${header.key}`, {
value: header.value,
});
}
}
// Process the deserialized value
logger.info('User data', {
userName: value.name,
userAge: value.age,
});
}
},
{
value: {
type: SchemaType.PROTOBUF,
schema: com.example.User,
},
}
);
For debugging purposes, you can also access the original key, value, and headers in their base64-encoded form, these are available in the originalValue
, originalKey
, and originalHeaders
properties of the record
.
topic
Topic name the record was published to Routing logic in multi-topic consumers partition
Kafka partition number Tracking message distribution offset
Position in the partition De-duplication, exactly-once processing timestamp
Unix timestamp when record was created Event timing analysis timestamp_type
Timestamp type (CREATE_TIME
or LOG_APPEND_TIME
) Data lineage verification headers
Key-value pairs attached to the message Cross-cutting concerns like correlation IDs key
Deserialized message key Customer ID or entity identifier value
Deserialized message content The actual business data originalValue
Base64-encoded original message value Debugging or custom deserialization originalKey
Base64-encoded original message key Debugging or custom deserialization originalHeaders
Base64-encoded original message headers Debugging or custom deserialization valueSchemaMetadata
Metadata about the value schema like schemaId
and dataFormat
Used by kafkaConsumer
to process Protobuf, data format validation keySchemaMetadata
Metadata about the key schema like schemaId
and dataFormat
Used by kafkaConsumer
to process Protobuf, data format validation Additional Parsing¶
You can parse deserialized data using your preferred parsing library. This can help you integrate Kafka data with your domain schemas and application architecture, providing type hints, runtime parsing and validation, and advanced data transformations.
ZodValibotArkType
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { z } from 'zod/v4';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const OrderItemSchema = z.object({
productId: z.string(),
quantity: z.number().int().positive(),
price: z.number().positive(),
});
const OrderSchema = z.object({
id: z.string(),
customerId: z.string(),
items: z.array(OrderItemSchema).min(1, 'Order must have at least one item'),
createdAt: z.iso.datetime(),
totalAmount: z.number().positive(),
});
const schemaConfig = {
value: {
type: SchemaType.JSON,
parserSchema: OrderSchema,
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer<unknown, z.infer<typeof OrderSchema>>(
async (event, _context) => {
for (const record of event.records) {
const {
value: { id, items },
} = record;
logger.setCorrelationId(id);
logger.debug(`order includes ${items.length} items`);
}
},
schemaConfig
);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import * as v from 'valibot';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const OrderItemSchema = v.object({
productId: v.string(),
quantity: v.pipe(v.number(), v.integer(), v.toMinValue(1)),
price: v.pipe(v.number(), v.integer()),
});
const OrderSchema = v.object({
id: v.string(),
customerId: v.string(),
items: v.pipe(
v.array(OrderItemSchema),
v.minLength(1, 'Order must have at least one item')
),
createdAt: v.pipe(v.string(), v.isoDateTime()),
totalAmount: v.pipe(v.number(), v.toMinValue(0)),
});
const schemaConfig = {
value: {
type: SchemaType.JSON,
parserSchema: OrderSchema,
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer<unknown, v.InferInput<typeof OrderSchema>>(
async (event, _context) => {
for (const record of event.records) {
const {
value: { id, items },
} = record;
logger.setCorrelationId(id);
logger.debug(`order includes ${items.length} items`);
}
},
schemaConfig
);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { type } from 'arktype';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const OrderItemSchema = type({
productId: 'string',
quantity: 'number.integer >= 1',
price: 'number.integer',
});
const OrderSchema = type({
id: 'string',
customerId: 'string',
items: OrderItemSchema.array().moreThanLength(0),
createdAt: 'string.date',
totalAmount: 'number.integer >= 0',
});
const schemaConfig = {
value: {
type: SchemaType.JSON,
parserSchema: OrderSchema,
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer<unknown, typeof OrderSchema.infer>(
async (event, _context) => {
for (const record of event.records) {
const {
value: { id, items },
} = record;
logger.setCorrelationId(id);
logger.debug(`order includes ${items.length} items`);
}
},
schemaConfig
);
Error handling¶
Handle errors gracefully when processing Kafka messages to ensure your application maintains resilience and provides clear diagnostic information. The Kafka consumer utility provides specific exception types to help you identify and handle deserialization issues effectively.
Tip
Fields like value
, key
, and headers
are decoded lazily, meaning they are only deserialized when accessed. This allows you to handle deserialization errors at the point of access rather than when the record is first processed.
Basic Error HandlingParser Error Handling
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
import { readFileSync } from 'node:fs';
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import { KafkaConsumerDeserializationError } from '@aws-lambda-powertools/kafka/errors';
import type {
ConsumerRecord,
SchemaConfig,
} from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const schemaConfig = {
value: {
type: SchemaType.AVRO,
schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'),
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer(async (event, _context) => {
const results: {
successful: number;
failed: Array<ConsumerRecord<unknown, unknown>>;
} = {
successful: 0,
failed: [],
};
for (const record of event.records) {
try {
const { value, partition, offset, topic } = record; // (1)!
logger.setCorrelationId(`${topic}-${partition}-${offset}`);
await processRecord(value);
results.successful += 1;
} catch (error) {
if (error instanceof KafkaConsumerDeserializationError) {
results.failed.push(record);
logger.error('Error deserializing message', { error });
} else {
logger.error('Error processing message', { error });
}
}
if (results.failed.length > 0) {
// Handle failed records, e.g., send to a dead-letter queue
}
logger.info('Successfully processed records', {
successful: results.successful,
});
}
}, schemaConfig);
value
, key
, or headers
properties of the record within the for...of
loop.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
import { readFileSync } from 'node:fs';
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import { KafkaConsumerParserError } from '@aws-lambda-powertools/kafka/errors';
import type {
ConsumerRecord,
SchemaConfig,
} from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { z } from 'zod/v4';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const schemaConfig = {
value: {
type: SchemaType.AVRO,
schema: readFileSync(new URL('./user.avsc', import.meta.url), 'utf8'),
parserSchema: z.object({
id: z.number(),
name: z.string(),
email: z.email(),
}),
},
} satisfies SchemaConfig;
export const handler = kafkaConsumer(async (event, _context) => {
const results: {
successful: number;
failed: Array<ConsumerRecord<unknown, unknown>>;
} = {
successful: 0,
failed: [],
};
for (const record of event.records) {
try {
const { value, partition, offset, topic } = record;
logger.setCorrelationId(`${topic}-${partition}-${offset}`);
await processRecord(value);
results.successful += 1;
} catch (error) {
if (error instanceof KafkaConsumerParserError) {
results.failed.push(record);
logger.error(
`Error deserializing message - ${z.prettifyError({ issues: error.cause } as z.ZodError)}`,
{ error } // (1)!
);
} else {
logger.error('Error processing message', { error });
}
}
if (results.failed.length > 0) {
// Handle failed records, e.g., send to a dead-letter queue
}
logger.info('Successfully processed records', {
successful: results.successful,
});
}
}, schemaConfig);
cause
property of the error is populated with the original Standard Schema parsing error, allowing you to access detailed information about the parsing failure.KafkaConsumerError
. Base class for all Kafka consumer errors General unhandled errors KafkaConsumerDeserializationError
Thrown when message deserialization fails Corrupted message data, schema mismatch, or wrong schema type configuration KafkaConsumerMissingSchemaError
Thrown when a required schema is not provided Missing schema for AVRO or PROTOBUF formats (required parameter) KafkaConsumerOutputSerializerError
Thrown when additional schema parsing fails Parsing failures in Standard Schema models Integrating with Idempotency¶
When processing Kafka messages in Lambda, failed batches can result in message reprocessing. The Idempotency utility prevents duplicate processing by tracking which messages have already been handled, ensuring each message is processed exactly once.
The Idempotency utility automatically stores the result of each successful operation, returning the cached result if the same message is processed again, which prevents potentially harmful duplicate operations like double-charging customers or double-counting metrics.
Tip
By using the Kafka record's unique coordinates (topic, partition, offset) as the idempotency key, you ensure that even if a batch fails and Lambda retries the messages, each message will be processed exactly once.
Idempotent Kafka Processing
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
import {
IdempotencyConfig,
makeIdempotent,
} from '@aws-lambda-powertools/idempotency';
import { DynamoDBPersistenceLayer } from '@aws-lambda-powertools/idempotency/dynamodb';
import { kafkaConsumer, SchemaType } from '@aws-lambda-powertools/kafka';
import type { SchemaConfig } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import { com } from './samples/user.generated.js'; // protobuf generated class
const logger = new Logger({ serviceName: 'kafka-consumer' });
const persistenceStore = new DynamoDBPersistenceLayer({
tableName: 'IdempotencyTable',
});
const schemaConfig = {
value: {
type: SchemaType.PROTOBUF,
schema: com.example.User,
},
} satisfies SchemaConfig;
const processRecord = makeIdempotent(
async (user, topic, partition, offset) => {
logger.info('processing user', {
user,
meta: {
topic,
partition,
offset,
},
});
// ...your business logic here
return {
success: true,
user,
};
},
{
persistenceStore,
config: new IdempotencyConfig({
eventKeyJmesPath: `topic & '-' & partition & '-' & offset`,
}),
}
);
export const handler = kafkaConsumer(async (event, _context) => {
for (const { value, topic, partition, offset } of event.records) {
await processRecord(value, topic, partition, offset);
}
}, schemaConfig);
Best practices¶ Handling large messages¶
When processing large Kafka messages in Lambda, be mindful of memory limitations. Although the Kafka consumer utility optimizes memory usage, large deserialized messages can still exhaust the function resources.
Handling Large Messages
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import { Logger } from '@aws-lambda-powertools/logger';
import { object, safeParse, string } from 'valibot';
const logger = new Logger({ serviceName: 'kafka-consumer' });
const LargeMessage = object({
key: string(),
bucket: string(),
});
export const handler = kafkaConsumer(async (event, _context) => {
for (const record of event.records) {
const { topic, value, originalValue } = record;
const valueSize = Buffer.byteLength(originalValue, 'utf8');
const parsedValue = safeParse(LargeMessage, value);
if (
topic === 'product-catalog' &&
valueSize > 3_000_000 &&
parsedValue.success
) {
logger.info('Large message detected, processing from S3', {
size: valueSize,
});
const { key, bucket } = parsedValue.output;
await processRecordFromS3({ key, bucket });
logger.info('Processed large message from S3', {
key,
bucket,
});
}
// regular processing of the record
}
});
For large messages, consider these proven approaches:
The number of Kafka records processed per Lambda invocation is controlled by your Event Source Mapping configuration. Properly sized batches optimize cost and performance.
Handling Large Messages
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
Resources:
OrderProcessingFunction:
Type: AWS::Serverless::Function
Properties:
Runtime: nodejs22.x
Handler: index.js
Events:
KafkaEvent:
Type: MSK
Properties:
Stream: !GetAtt OrdersMSKCluster.Arn
Topics:
- order-events
- payment-events
# Configuration for optimal throughput/latency balance
BatchSize: 100
MaximumBatchingWindowInSeconds: 5
StartingPosition: LATEST
# Enable partial batch success reporting
FunctionResponseTypes:
- ReportBatchItemFailures
Different workloads benefit from different batch configurations:
When using binary serialization formats across multiple programming languages, ensure consistent schema handling to prevent deserialization failures.
Common cross-language challenges to address:
When encountering deserialization errors with your Kafka messages, follow this systematic troubleshooting approach to identify and resolve the root cause.
First, check that your schema definition exactly matches the message format. Even minor discrepancies can cause deserialization failures, especially with binary formats like Avro and Protocol Buffers.
For binary messages that fail to deserialize, examine the raw encoded data:
// DO NOT include this code in production handlers
// For troubleshooting purposes only
import base64
const rawBytes = Buffer.from(record.originalValue, 'base64');
console.log(`Message size: ${rawBytes.length} bytes`);
console.log(`First 50 bytes (hex): ${rawBytes.slice(0, 50).toString('hex')}`);
Schema compatibility issues¶
Schema compatibility issues often manifest as successful connections but failed deserialization. Common causes include:
When using Schema Registry, verify schema compatibility rules are properly configured for your topics and that all applications use the same registry.
Memory and timeout optimization¶Lambda functions processing Kafka messages may encounter resource constraints, particularly with large batches or complex processing logic.
For memory errors:
BatchSize
parameter in your event source mappingFor timeout issues:
Monitoring memory usage
Use CloudWatch metrics to track your function's memory utilization. If it consistently exceeds 80% of allocated memory, consider increasing the memory allocation or optimizing your code.
Kafka consumer workflow¶ Using ESM with Schema Registry validation (SOURCE)¶sequenceDiagram
participant Kafka
participant ESM as Event Source Mapping
participant SchemaRegistry as Schema Registry
participant Lambda
participant KafkaConsumer
participant YourCode
Kafka->>+ESM: Send batch of records
ESM->>+SchemaRegistry: Validate schema
SchemaRegistry-->>-ESM: Confirm schema is valid
ESM->>+Lambda: Invoke with validated records (still encoded)
Lambda->>+KafkaConsumer: Pass Kafka event
KafkaConsumer->>KafkaConsumer: Parse event structure
loop For each record
KafkaConsumer->>KafkaConsumer: Decode base64 data
KafkaConsumer->>KafkaConsumer: Deserialize based on schema_type
alt Output serializer provided
KafkaConsumer->>KafkaConsumer: Apply output serializer
end
end
KafkaConsumer->>+YourCode: Provide ConsumerRecords
YourCode->>YourCode: Process records
YourCode-->>-KafkaConsumer: Return result
KafkaConsumer-->>-Lambda: Pass result back
Lambda-->>-ESM: Return response
ESM-->>-Kafka: Acknowledge processed batch
Using ESM with Schema Registry deserialization (JSON)¶
sequenceDiagram
participant Kafka
participant ESM as Event Source Mapping
participant SchemaRegistry as Schema Registry
participant Lambda
participant KafkaConsumer
participant YourCode
Kafka->>+ESM: Send batch of records
ESM->>+SchemaRegistry: Validate and deserialize
SchemaRegistry->>SchemaRegistry: Deserialize records
SchemaRegistry-->>-ESM: Return deserialized data
ESM->>+Lambda: Invoke with pre-deserialized JSON records
Lambda->>+KafkaConsumer: Pass Kafka event
KafkaConsumer->>KafkaConsumer: Parse event structure
loop For each record
KafkaConsumer->>KafkaConsumer: Record is already deserialized
alt Output serializer provided
KafkaConsumer->>KafkaConsumer: Apply output serializer
end
end
KafkaConsumer->>+YourCode: Provide ConsumerRecords
YourCode->>YourCode: Process records
YourCode-->>-KafkaConsumer: Return result
KafkaConsumer-->>-Lambda: Pass result back
Lambda-->>-ESM: Return response
ESM-->>-Kafka: Acknowledge processed batch
Using ESM without Schema Registry integration¶
sequenceDiagram
participant Kafka
participant Lambda
participant KafkaConsumer
participant YourCode
Kafka->>+Lambda: Invoke with batch of records (direct integration)
Lambda->>+KafkaConsumer: Pass raw Kafka event
KafkaConsumer->>KafkaConsumer: Parse event structure
loop For each record
KafkaConsumer->>KafkaConsumer: Decode base64 data
KafkaConsumer->>KafkaConsumer: Deserialize based on schema_type
alt Output serializer provided
KafkaConsumer->>KafkaConsumer: Apply output serializer
end
end
KafkaConsumer->>+YourCode: Provide ConsumerRecords
YourCode->>YourCode: Process records
YourCode-->>-KafkaConsumer: Return result
KafkaConsumer-->>-Lambda: Pass result back
Lambda-->>-Kafka: Acknowledge processed batch
Testing your code¶
Testing Kafka consumer code requires simulating Lambda events with Kafka messages. You can create simple test cases using local JSON files without needing a live Kafka cluster. Below an example of how to simulate a JSON message.
Testing your code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
import type { MSKEvent } from '@aws-lambda-powertools/kafka/types';
import type { Context } from 'aws-lambda';
import { expect, it } from 'vitest';
import { handler } from './gettingStartedPrimitiveValues.js';
it('handles complex protobuf messages from Glue Schema Registry', async () => {
// Prepare
const event = {
eventSource: 'aws:kafka',
eventSourceArn:
'arn:aws:kafka:us-east-1:123456789012:cluster/MyCluster/12345678-1234-1234-1234-123456789012-1',
bootstrapServers:
'b-1.mskcluster.abcde12345.us-east-1.kafka.amazonaws.com:9092',
records: {
'orders-topic': [
{
topic: 'orders-topic',
partition: 0,
offset: 15,
timestamp: 1545084650987,
timestampType: 'CREATE_TIME',
headers: [],
key: undefined,
keySchemaMetadata: {
dataFormat: 'JSON',
},
valueSchemaMetadata: {
dataFormat: 'JSON',
schemaId: undefined,
},
value: Buffer.from(
JSON.stringify({ order_id: '12345', amount: 99.95 })
).toString('base64'),
},
],
},
} as MSKEvent;
// Act
const result = await handler(event, {} as Context);
// Assess
expect(result).toBeDefined();
// You can add more specific assertions based on your handler's logic
});
2025-08-14
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4