Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
The Kafka extension for Azure Functions lets you write values out to Apache Kafka topics by using an output binding. You can also use a trigger to invoke your functions in response to messages in Kafka topics.
Important
Kafka bindings are only available for Functions on the Elastic Premium Plan and Dedicated (App Service) plan. They are only supported on version 3.x and later version of the Functions runtime.
Action | Type |
---|---|
Run a function based on a new Kafka event. | Trigger |
Write to the Kafka event stream. | Output binding |
Install extension
The extension NuGet package you install depends on the C# mode you're using in your function app:
Functions execute in an isolated C# worker process. To learn more, see Guide for running C# Azure Functions in an isolated worker process.
Add the extension to your project by installing this NuGet package.
Install bundle
To be able to use this binding extension in your app, make sure that the host.json file in the root of your project contains this extensionBundle
reference:
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.0.0, 5.0.0)"
}
}
In this example, the version
value of [4.0.0, 5.0.0)
instructs the Functions host to use a bundle version that is at least 4.0.0
but less than 5.0.0
, which includes all potential versions of 4.x. This notation effectively maintains your app on the latest available minor version of the v4.x extension bundle.
When possible, you should use the latest extension bundle major version and allow the runtime to automatically maintain the latest minor version. You can view the contents of the latest bundle on the extension bundles release page.
If your app requires you to use a previous extension version, you might need to instead specify a previous bundle version. You can review the bundle releases to locate a bundle that contains a version of this extension that can be used by your app. For more information, see Azure Functions extension bundles. ::: zone-end
Enable runtime scaling
To allow your functions to scale properly on the Premium plan when using Kafka triggers and bindings, you need to enable runtime scale monitoring.
In the Azure portal, in your function app, select Configuration.
On the Function runtime settings tab, for Runtime Scale Monitoring, select On.
host.json settings
This section describes the configuration settings available for this binding in versions 3.x and higher. Settings in the host.json file apply to all functions in a function app instance. For more information about function app configuration settings in versions 3.x and later versions, see the host.json reference for Azure Functions.
{
"version": "2.0",
"extensions": {
"kafka": {
"maxBatchSize": 64,
"SubscriberIntervalInSeconds": 1,
"ExecutorChannelCapacity": 1,
"ChannelFullRetryIntervalInMs": 50
}
}
}
Property | Default | Type | Description |
---|---|---|---|
ChannelFullRetryIntervalInMs | 50 | Trigger | Defines the subscriber retry interval, in milliseconds, used when attempting to add items to an at-capacity channel. |
ExecutorChannelCapacity | 1 | Both | Defines the channel message capacity. Once capacity is reached, the Kafka subscriber pauses until the function catches up. |
MaxBatchSize | 64 | Trigger | Maximum batch size when calling a Kafka triggered function. |
SubscriberIntervalInSeconds | 1 | Trigger | Defines the minimum frequency incoming messages are executed, per function in seconds. Only when the message volume is less than MaxBatchSize / SubscriberIntervalInSeconds |
The following properties, which are inherited from the Apache Kafka C/C++ client library, are also supported in the kafka
section of host.json, for either triggers or both output bindings and triggers:
Property | Applies to | librdkafka equivalent |
---|---|---|
AutoCommitIntervalMs | Trigger | auto.commit.interval.ms |
AutoOffsetReset | Trigger | auto.offset.reset |
FetchMaxBytes | Trigger | fetch.max.bytes |
LibkafkaDebug | Both | debug |
MaxPartitionFetchBytes | Trigger | max.partition.fetch.bytes |
MaxPollIntervalMs | Trigger | max.poll.interval.ms |
MetadataMaxAgeMs | Both | metadata.max.age.ms |
QueuedMinMessages | Trigger | queued.min.messages |
QueuedMaxMessagesKbytes | Trigger | queued.max.messages.kbytes |
ReconnectBackoffMs | Trigger | reconnect.backoff.max.ms |
ReconnectBackoffMaxMs | Trigger | reconnect.backoff.max.ms |
SessionTimeoutMs | Trigger | session.timeout.ms |
SocketKeepaliveEnable | Both | socket.keepalive.enable |
StatisticsIntervalMs | Trigger | statistics.interval.ms |