Full featured high performance kafka library for Tarantool based on librdkafka.
Can produce more then 150k messages per second and consume more then 140k messages per second.
error
while others throws box.error
instead. kafka
returns non-critical errors as strings which allows you to decide how to handle it.To install the kafka module with builtin librdkafka
dependency, use the STATIC_BUILD
option:
tt rocks STATIC_BUILD=ON install kafka
Be aware that this approach doesn't include static openssl. Instead, it assumes tarantool has openssl symbols exported. That means, kafka static build is only usable with static tarantool build.
For a successful static build, you need to compile kafka against the same version of openssl that tarantool does.
Consumer
local os = require('os') local log = require('log') local tnt_kafka = require('kafka') local consumer, err = tnt_kafka.Consumer.create({ brokers = "localhost:9092" }) if err ~= nil then print(err) os.exit(1) end local err = consumer:subscribe({ "some_topic" }) if err ~= nil then print(err) os.exit(1) end local out, err = consumer:output() if err ~= nil then print(string.format("got fatal error '%s'", err)) os.exit(1) end while true do if out:is_closed() then os.exit(1) end local msg = out:get() if msg ~= nil then print(string.format( "got msg with topic='%s' partition='%s' offset='%s' key='%s' value='%s'", msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value() )) end end -- from another fiber on app shutdown consumer:close()
Producer
local os = require('os') local log = require('log') local tnt_kafka = require('kafka') local producer, err = tnt_kafka.Producer.create({ brokers = "kafka:9092" }) if err ~= nil then print(err) os.exit(1) end for i = 1, 1000 do local message = "test_value " .. tostring(i) local err = producer:produce({ topic = "test_topic", key = "test_key", value = message }) if err ~= nil then print(string.format("got error '%s' while sending value '%s'", err, message)) else print(string.format("successfully sent value '%s'", message)) end end producer:close()
You can pass additional configuration parameters for librdkafka https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md in special table options
on client creation:
tnt_kafka.Producer.create({ options = { ["some.key"] = "some_value", }, }) tnt_kafka.Consumer.create({ options = { ["some.key"] = "some_value", }, })
More examples in examples
folder.
Connection to brokers using SSL supported by librdkafka itself so you only need to properly configure brokers by using this guide https://github.com/confluentinc/librdkafka/wiki/Using-SSL-with-librdkafka
After that you only need to pass following configuration parameters on client creation:
tnt_kafka.Producer.create({ brokers = "broker_list", options = { ["security.protocol"] = "ssl", -- CA certificate file for verifying the broker's certificate. ["ssl.ca.location"] = "ca-cert", -- Client's certificate ["ssl.certificate.location"] = "client_?????_client.pem", -- Client's key ["ssl.key.location"] = "client_?????_client.key", -- Key password, if any ["ssl.key.password"] = "abcdefgh", }, }) tnt_kafka.Consumer.create({ brokers = "broker_list", options = { ["security.protocol"] = "ssl", -- CA certificate file for verifying the broker's certificate. ["ssl.ca.location"] = "ca-cert", -- Client's certificate ["ssl.certificate.location"] = "client_?????_client.pem", -- Client's key ["ssl.key.location"] = "client_?????_client.key", -- Key password, if any ["ssl.key.password"] = "abcdefgh", }, })
Before any commands init and updated git submodule
git submodule init git submodule update
Result: over 160000 produced messages per second on macbook pro 2016
Local run in docker:
make docker-run-environment make docker-create-benchmark-async-producer-topic make docker-run-benchmark-async-producer-interactive
Result: over 90000 produced messages per second on macbook pro 2016
Local run in docker:
make docker-run-environment make docker-create-benchmark-sync-producer-topic make docker-run-benchmark-sync-producer-interactiveAuto offset store enabled
Result: over 190000 consumed messages per second on macbook pro 2016
Local run in docker:
make docker-run-environment make docker-create-benchmark-auto-offset-store-consumer-topic make docker-run-benchmark-auto-offset-store-consumer-interactive
Result: over 190000 consumed messages per second on macbook pro 2016
Local run in docker:
make docker-run-environment make docker-create-benchmark-manual-commit-consumer-topic make docker-run-benchmark-manual-commit-consumer-interactive
Before run any test you should add to /etc/hosts
entry
You can run docker based integration tests via makefile target
make test-run-with-docker
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4