|
|
|
|
|
#include "kafka_handler.h"
|
|
|
|
|
|
#include <QVector>
|
|
|
|
|
|
bool KafkaHandler::_run = false;
|
|
|
RdKafka::Producer* KafkaHandler::_producer = nullptr;
|
|
|
|
|
|
void KafkaHandler::init() {
|
|
|
auto conf = KafkaConfig::getIns();
|
|
|
auto consumerServers = conf->getConsumerServers().join(",").toStdString();
|
|
|
auto consumerTopics = conf->getConsumerTopics();
|
|
|
auto producer = conf->getProducerServers().join(",").toStdString();
|
|
|
|
|
|
if(consumerTopics.empty()){
|
|
|
qFatal("kafka consumer topic can`t empty !");
|
|
|
}
|
|
|
|
|
|
// 初始化 consumer
|
|
|
_run = true;
|
|
|
_initConsumer(consumerServers, consumerTopics);
|
|
|
|
|
|
// 实例化 producer
|
|
|
_initProducer(producer);
|
|
|
|
|
|
}
|
|
|
|
|
|
void KafkaHandler::_initConsumer(const std::string& servers, const QStringList& topics) {
|
|
|
auto conf=RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
|
|
|
std::string err;
|
|
|
if(conf->set("bootstrap.servers", servers, err) != RdKafka::Conf::CONF_OK){
|
|
|
qFatal("kafka consumer set bootstrap servers error: %s", &err);
|
|
|
}
|
|
|
|
|
|
if(conf->set("group.id",topics.join(",").toStdString(),err) != RdKafka::Conf::CONF_OK){
|
|
|
qFatal("kafka consumer set group id error: %s", &err);
|
|
|
}
|
|
|
|
|
|
//创建消费者
|
|
|
auto consumer= RdKafka::KafkaConsumer::create(conf,err);
|
|
|
if (!consumer) {
|
|
|
qFatal("create consumer error: %s", &err);
|
|
|
}
|
|
|
|
|
|
QVector<std::string> vector;
|
|
|
|
|
|
for (const auto &item: topics.toVector()){
|
|
|
vector.push_back(item.toStdString());
|
|
|
}
|
|
|
|
|
|
consumer->subscribe(vector.toStdVector());
|
|
|
|
|
|
QtConcurrent::run(_consumerStart, consumer);
|
|
|
}
|
|
|
|
|
|
void KafkaHandler::_consumerStart(RdKafka::KafkaConsumer * consumer) {
|
|
|
while (_run){
|
|
|
RdKafka::Message *msg = consumer->consume(100);
|
|
|
_receive(msg);
|
|
|
}
|
|
|
consumer->close();
|
|
|
delete consumer;
|
|
|
}
|
|
|
|
|
|
void KafkaHandler::_receive(RdKafka::Message *message) {
|
|
|
switch (message->err())
|
|
|
{
|
|
|
case RdKafka::ERR__TIMED_OUT:
|
|
|
break;
|
|
|
case RdKafka::ERR_NO_ERROR:
|
|
|
{
|
|
|
// TODO key 校验 message->key()
|
|
|
QString key;
|
|
|
if(message->key()){
|
|
|
key = QString::fromStdString(*message->key());
|
|
|
qDebug() << "receive message key:" << key;
|
|
|
}
|
|
|
|
|
|
// TODO header 校验 message->headers()
|
|
|
auto headers = message->headers();
|
|
|
if (headers) {
|
|
|
std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
|
|
|
for (size_t i = 0; i < hdrs.size(); i++) {
|
|
|
const RdKafka::Headers::Header hdr = hdrs[i];
|
|
|
if (hdr.value() != nullptr){
|
|
|
std::string value (static_cast<const char*>(hdr.value()), hdr.value_size());
|
|
|
qDebug() << "receive message headers: " << QString::fromStdString(hdr.key())
|
|
|
<< "-" << QString::fromStdString(value);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
std::string payload (static_cast<const char*>(message->payload()), message->len());
|
|
|
auto msg = QString::fromStdString(payload);
|
|
|
qDebug() << "receive message payload:" << msg;
|
|
|
|
|
|
emit getIns()->handle(key, msg);
|
|
|
}
|
|
|
break;
|
|
|
case RdKafka::ERR__PARTITION_EOF:
|
|
|
break;
|
|
|
case RdKafka::ERR__UNKNOWN_TOPIC:
|
|
|
case RdKafka::ERR__UNKNOWN_PARTITION:
|
|
|
qCritical("consume failed: %s", qPrintable(QString::fromStdString(message->errstr())));
|
|
|
_run = false;
|
|
|
break;
|
|
|
default:
|
|
|
qWarning("consume failed: %s" , qPrintable(QString::fromStdString(message->errstr())));
|
|
|
_run = false;
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void KafkaHandler::stop() {
|
|
|
_run = false;
|
|
|
}
|
|
|
|
|
|
void KafkaHandler::_initProducer(const std::string &servers) {
|
|
|
auto conf= RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
|
|
|
std::string err;
|
|
|
if(conf->set("bootstrap.servers", servers, err) != RdKafka::Conf::CONF_OK){
|
|
|
qFatal("kafka consumer set bootstrap servers error: %s", &err);
|
|
|
}
|
|
|
//创建生产者
|
|
|
_producer = RdKafka::Producer::create(conf,err);
|
|
|
if(!_producer)
|
|
|
{
|
|
|
qFatal("create producer error: %s", &err);
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
bool KafkaHandler::message(const QString& topic, const QString& message,const QString& key, RdKafka::Headers *headers, int partition, void * que) {
|
|
|
auto payload = message.toStdString();
|
|
|
|
|
|
std::string k;
|
|
|
if(nullptr != key){
|
|
|
k = key.toStdString();
|
|
|
}
|
|
|
|
|
|
auto code = _producer->produce(topic.toStdString(),
|
|
|
partition,
|
|
|
RdKafka::Producer::RK_MSG_COPY,
|
|
|
const_cast<char *>(payload.c_str()),
|
|
|
payload.size(),
|
|
|
const_cast<char *>(k.c_str()), k.size(),
|
|
|
0,
|
|
|
headers,
|
|
|
que
|
|
|
);
|
|
|
return RdKafka::ERR_NO_ERROR == code;
|
|
|
}
|
|
|
|
|
|
KafkaHandler *KafkaHandler::getIns() {
|
|
|
static KafkaHandler kafkaHandler;
|
|
|
return &kafkaHandler;
|
|
|
}
|
|
|
|
|
|
KafkaHandler::KafkaHandler() = default;
|
|
|
|
|
|
KafkaHandler::~KafkaHandler() = default;
|
|
|
|