The output binding allows an Azure Functions app to write messages to a Kafka topic.
ExampleThe usage of the binding depends on the C# modality used in your function app, which can be one of the following:
The attributes you use depend on the specific event provider.
The following example shows a C# function that sends a single message to a Kafka topic, using data provided in HTTP GET request.
[FunctionName("KafkaOutput")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out string eventData,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.Query["message"];
string responseMessage = "Ok";
eventData = message;
return new OkObjectResult(responseMessage);
}
To send events in a batch, use an array of KafkaEventData
objects, as shown in the following example:
[FunctionName("KafkaOutputMany")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string>[] eventDataArr,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
eventDataArr = new KafkaEventData<string>[2];
eventDataArr[0] = new KafkaEventData<string>("one");
eventDataArr[1] = new KafkaEventData<string>("two");
return new OkObjectResult("Ok");
}
}
The following function adds headers to the Kafka output data:
{
[FunctionName("KafkaOutputWithHeaders")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string> eventData,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.Query["message"];
eventData = new KafkaEventData<string>(message);
eventData.Headers.Add("test", System.Text.Encoding.UTF8.GetBytes("dotnet"));
return new OkObjectResult("Ok");
}
For a complete set of working .NET examples, see the Kafka extension repository.
The following example shows a C# function that sends a single message to a Kafka topic, using data provided in HTTP GET request.
{
[FunctionName("KafkaOutput")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out string eventData,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.Query["message"];
string responseMessage = "Ok";
eventData = message;
return new OkObjectResult(responseMessage);
To send events in a batch, use an array of KafkaEventData
objects, as shown in the following example:
[FunctionName("KafkaOutputMany")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string>[] eventDataArr,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
eventDataArr = new KafkaEventData<string>[2];
eventDataArr[0] = new KafkaEventData<string>("one");
eventDataArr[1] = new KafkaEventData<string>("two");
return new OkObjectResult("Ok");
}
}
The following function adds headers to the Kafka output data:
{
[FunctionName("KafkaOutputWithHeaders")]
public static IActionResult Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequest req,
[Kafka("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)] out KafkaEventData<string> eventData,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.Query["message"];
eventData = new KafkaEventData<string>(message);
eventData.Headers.Add("test", System.Text.Encoding.UTF8.GetBytes("dotnet"));
return new OkObjectResult("Ok");
}
For a complete set of working .NET examples, see the Kafka extension repository.
The following example has a custom return type that is MultipleOutputType
, which consists of an HTTP response and a Kafka output.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
In the class MultipleOutputType
, Kevent
is the output binding variable for the Kafka binding.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
To send a batch of events, pass a string array to the output type, as shown in the following example:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
The string array is defined as Kevents
property on the class, on which the output binding is defined:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
The following function adds headers to the Kafka output data:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
For a complete set of working .NET examples, see the Kafka extension repository.
The following example has a custom return type that is MultipleOutputType
, which consists of an HTTP response and a Kafka output.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
In the class MultipleOutputType
, Kevent
is the output binding variable for the Kafka binding.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
To send a batch of events, pass a string array to the output type, as shown in the following example:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
The string array is defined as Kevents
property on the class, on which the output binding is defined:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "$ConnectionString",
Password = "EventHubConnectionString",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
The following function adds headers to the Kafka output data:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
For a complete set of working .NET examples, see the Kafka extension repository.
The specific properties of the function.json file depend on your event provider, which in these examples are either Confluent or Azure Event Hubs. The following examples show a Kafka output binding for a function that is triggered by an HTTP request and sends data from the request to the Kafka topic.
The following function.json defines the trigger for the specific provider in these examples:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
The following code then sends a message to the topic:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
The following code sends multiple messages as an array to the same topic:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
The following example shows how to send an event message with headers to the same Kafka topic:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
For a complete set of working JavaScript examples, see the Kafka extension repository.
The specific properties of the function.json file depend on your event provider, which in these examples are either Confluent or Azure Event Hubs. The following examples show a Kafka output binding for a function that is triggered by an HTTP request and sends data from the request to the Kafka topic.
The following function.json defines the trigger for the specific provider in these examples:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "$ConnectionString",
"password" : "EventHubConnectionString",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
The following code then sends a message to the topic:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
The following code sends multiple messages as an array to the same topic:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
The following example shows how to send an event message with headers to the same Kafka topic:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
For a complete set of working PowerShell examples, see the Kafka extension repository.
The specific properties of the function.json file depend on your event provider, which in these examples are either Confluent or Azure Event Hubs. The following examples show a Kafka output binding for a function that is triggered by an HTTP request and sends data from the request to the Kafka topic.
The following function.json defines the trigger for the specific provider in these examples:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "$ConnectionString",
"password": "EventHubConnectionString",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
The following code then sends a message to the topic:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
The following code sends multiple messages as an array to the same topic:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
The following example shows how to send an event message with headers to the same Kafka topic:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
For a complete set of working Python examples, see the Kafka extension repository.
The annotations you use to configure the output binding depend on the specific event provider.
The following function sends a message to the Kafka topic.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
The following example shows how to send multiple messages to a Kafka topic.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
In this example, the output binding parameter is changed to string array.
The last example uses to these KafkaEntity
and KafkaHeader
classes:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
The following example function sends a message with headers to a Kafka topic.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
For a complete set of working Java examples for Confluent, see the Kafka extension repository.
The following function sends a message to the Kafka topic.
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}
The following example shows how to send multiple messages to a Kafka topic.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
In this example, the output binding parameter is changed to string array.
The last example uses to these KafkaEntity
and KafkaHeader
classes:
public class KafkaEntity {
int Offset;
int Partition;
String Timestamp;
String Topic;
String Value;
KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
String Key;
String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
The following example function sends a message with headers to a Kafka topic.
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username= "$ConnectionString",
password = "EventHubConnectionString",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
}
For a complete set of working Java examples for Confluent, see the Kafka extension repository.
AttributesBoth in-process and isolated worker process C# libraries use the Kafka
attribute to define the function trigger.
The following table explains the properties you can set using this attribute:
Parameter Description BrokerList (Required) The list of Kafka brokers to which the output is sent. See Connections for more information. Topic (Required) The topic to which the output is sent. AvroSchema (Optional) Schema of a generic record when using the Avro protocol. MaxMessageBytes (Optional) The maximum size of the output message being sent (in MB), with a default value of1
. BatchSize (Optional) Maximum number of messages batched in a single message set, with a default value of 10000
. EnableIdempotence (Optional) When set to true
, guarantees that messages are successfully produced exactly once and in the original produce order, with a default value of false
MessageTimeoutMs (Optional) The local message timeout, in milliseconds. This value is only enforced locally and limits the time a produced message waits for successful delivery, with a default 300000
. A time of 0
is infinite. This value is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. RequestTimeoutMs (Optional) The acknowledgment timeout of the output request, in milliseconds, with a default of 5000
. MaxRetries (Optional) The number of times to retry sending a failing Message, with a default of 2
. Retrying may cause reordering, unless EnableIdempotence
is set to true
. AuthenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are Gssapi
, Plain
(default), ScramSha256
, ScramSha512
. Username (Optional) The username for SASL authentication. Not supported when AuthenticationMode
is Gssapi
. See Connections for more information. Password (Optional) The password for SASL authentication. Not supported when AuthenticationMode
is Gssapi
. See Connections for more information. Protocol (Optional) The security protocol used when communicating with brokers. The supported values are plaintext
(default), ssl
, sasl_plaintext
, sasl_ssl
. SslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate. SslCertificateLocation (Optional) Path to the client's certificate. SslKeyLocation (Optional) Path to client's private key (PEM) used for authentication. SslKeyPassword (Optional) Password for client's certificate. Annotations
The KafkaOutput
annotation allows you to create a function that writes to a specific topic. Supported options include the following elements:
string
, the input is treated as just a string. When binary
, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[]. avroSchema (Optional) Schema of a generic record when using the Avro protocol. (Currently not supported for Java.) maxMessageBytes (Optional) The maximum size of the output message being sent (in MB), with a default value of 1
. batchSize (Optional) Maximum number of messages batched in a single message set, with a default value of 10000
. enableIdempotence (Optional) When set to true
, guarantees that messages are successfully produced exactly once and in the original produce order, with a default value of false
messageTimeoutMs (Optional) The local message timeout, in milliseconds. This value is only enforced locally and limits the time a produced message waits for successful delivery, with a default 300000
. A time of 0
is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. requestTimeoutMs (Optional) The acknowledgment timeout of the output request, in milliseconds, with a default of 5000
. maxRetries (Optional) The number of times to retry sending a failing Message, with a default of 2
. Retrying may cause reordering, unless EnableIdempotence
is set to true
. authenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are Gssapi
, Plain
(default), ScramSha256
, ScramSha512
. username (Optional) The username for SASL authentication. Not supported when AuthenticationMode
is Gssapi
. See Connections for more information. password (Optional) The password for SASL authentication. Not supported when AuthenticationMode
is Gssapi
. See Connections for more information. protocol (Optional) The security protocol used when communicating with brokers. The supported values are plaintext
(default), ssl
, sasl_plaintext
, sasl_ssl
. sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate. sslCertificateLocation (Optional) Path to the client's certificate. sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication. sslKeyPassword (Optional) Password for client's certificate. Configuration
The following table explains the binding configuration properties that you set in the function.json file.
function.json property Description type Must be set tokafka
. direction Must be set to out
. name The name of the variable that represents the brokered data in function code. brokerList (Required) The list of Kafka brokers to which the output is sent. See Connections for more information. topic (Required) The topic to which the output is sent. avroSchema (Optional) Schema of a generic record when using the Avro protocol. maxMessageBytes (Optional) The maximum size of the output message being sent (in MB), with a default value of 1
. batchSize (Optional) Maximum number of messages batched in a single message set, with a default value of 10000
. enableIdempotence (Optional) When set to true
, guarantees that messages are successfully produced exactly once and in the original produce order, with a default value of false
messageTimeoutMs (Optional) The local message timeout, in milliseconds. This value is only enforced locally and limits the time a produced message waits for successful delivery, with a default 300000
. A time of 0
is infinite. This is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. requestTimeoutMs (Optional) The acknowledgment timeout of the output request, in milliseconds, with a default of 5000
. maxRetries (Optional) The number of times to retry sending a failing Message, with a default of 2
. Retrying may cause reordering, unless EnableIdempotence
is set to true
. authenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are Gssapi
, Plain
(default), ScramSha256
, ScramSha512
. username (Optional) The username for SASL authentication. Not supported when AuthenticationMode
is Gssapi
. See Connections for more information. password (Optional) The password for SASL authentication. Not supported when AuthenticationMode
is Gssapi
. See Connections for more information. protocol (Optional) The security protocol used when communicating with brokers. The supported values are plaintext
(default), ssl
, sasl_plaintext
, sasl_ssl
. sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate. sslCertificateLocation (Optional) Path to the client's certificate. sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication. sslKeyPassword (Optional) Password for client's certificate. Usage
Both keys and values types are supported with built-in Avro and Protobuf serialization.
The offset, partition, and timestamp for the event are generated at runtime. Only value and headers can be set inside the function. Topic is set in the function.json.
Please make sure to have access to the Kafka topic to which you are trying to write. You configure the binding with access and connection credentials to the Kafka topic.
In a Premium plan, you must enable runtime scale monitoring for the Kafka output to be able to scale out to multiple instances. To learn more, see Enable runtime scaling.
For a complete set of supported host.json settings for the Kafka trigger, see host.json settings.
ConnectionsAll connection information required by your triggers and bindings should be maintained in application settings and not in the binding definitions in your code. This is true for credentials, which should never be stored in your code.
Important
Credential settings must reference an application setting. Don't hard-code credentials in your code or configuration files. When running locally, use the local.settings.json file for your credentials, and don't publish the local.settings.json file.
When connecting to a managed Kafka cluster provided by Confluent in Azure, make sure that the following authentication credentials for your Confluent Cloud environment are set in your trigger or binding:
Setting Recommended value Description BrokerListBootstrapServer
App setting named BootstrapServer
contains the value of bootstrap server found in Confluent Cloud settings page. The value resembles xyz-xyzxzy.westeurope.azure.confluent.cloud:9092
. Username ConfluentCloudUsername
App setting named ConfluentCloudUsername
contains the API access key from the Confluent Cloud web site. Password ConfluentCloudPassword
App setting named ConfluentCloudPassword
contains the API secret obtained from the Confluent Cloud web site.
When connecting to Event Hubs, make sure that the following authentication credentials for your Event Hubs instance are set in your trigger or binding:
Setting Recommended value Description BrokerListBootstrapServer
App setting named BootstrapServer
contains the fully qualified domain name of your Event Hubs instance. The value resembles <MY_NAMESPACE_NAME>.servicebus.windows.net:9093
. Username $ConnectionString
Actual value is obtained from the connection string. Password %EventHubsConnectionString%
App setting named EventHubsConnectionString
contains the connection string for your Event Hubs namespace. To learn more, see Get an Event Hubs connection string.
The string values you use for these settings must be present as application settings in Azure or in the Values
collection in the local.settings.json file during local development.
You should also set the Protocol
, AuthenticationMode
, and SslCaLocation
in your binding definitions.
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4