You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
5.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "kafka_handler.h"
#include <QVector>
bool KafkaHandler::_run = false;
RdKafka::Producer* KafkaHandler::_producer = nullptr;
void KafkaHandler::init() {
auto conf = KafkaConfig::getIns();
auto consumerServers = conf->getConsumerServers().join(",").toStdString();
auto consumerTopics = conf->getConsumerTopics();
auto producer = conf->getProducerServers().join(",").toStdString();
if(consumerTopics.empty()){
qFatal("kafka consumer topic can`t empty ");
}
// 初始化 consumer
_run = true;
_initConsumer(consumerServers, consumerTopics);
// 实例化 producer
_initProducer(producer);
}
void KafkaHandler::_initConsumer(const std::string& servers, const QStringList& topics) {
auto conf=RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string err;
if(conf->set("bootstrap.servers", servers, err) != RdKafka::Conf::CONF_OK){
qFatal("kafka consumer set bootstrap servers error: %s", &err);
}
if(conf->set("group.id",topics.join(",").toStdString(),err) != RdKafka::Conf::CONF_OK){
qFatal("kafka consumer set group id error: %s", &err);
}
//创建消费者
auto consumer= RdKafka::KafkaConsumer::create(conf,err);
if (!consumer) {
qFatal("create consumer error: %s", &err);
}
QVector<std::string> vector;
for (const auto &item: topics.toVector()){
vector.push_back(item.toStdString());
}
consumer->subscribe(vector.toStdVector());
QtConcurrent::run(_consumerStart, consumer);
}
void KafkaHandler::_consumerStart(RdKafka::KafkaConsumer * consumer) {
while (_run){
RdKafka::Message *msg = consumer->consume(100);
_receive(msg);
}
consumer->close();
delete consumer;
}
void KafkaHandler::_receive(RdKafka::Message *message) {
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
{
// TODO key 校验 message->key()
QString key;
if(message->key()){
key = QString::fromStdString(*message->key());
// qDebug() << "receive message key:" << key;
}
// TODO header 校验 message->headers()
auto headers = message->headers();
if (headers) {
std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
for (size_t i = 0; i < hdrs.size(); i++) {
const RdKafka::Headers::Header hdr = hdrs[i];
if (hdr.value() != nullptr){
std::string value (static_cast<const char*>(hdr.value()), hdr.value_size());
// qDebug() << "receive message headers: " << QString::fromStdString(hdr.key())
// << "-" << QString::fromStdString(value);
}
}
}
std::string payload (static_cast<const char*>(message->payload()), message->len());
auto msg = QString::fromStdString(payload);
// qDebug() << "receive message payload:" << msg;
emit getIns()->handle(key, msg);
}
break;
case RdKafka::ERR__PARTITION_EOF:
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
qCritical("consume failed: %s", qPrintable(QString::fromStdString(message->errstr())));
_run = false;
break;
default:
qWarning("consume failed: %s" , qPrintable(QString::fromStdString(message->errstr())));
_run = false;
break;
}
}
void KafkaHandler::stop() {
_run = false;
}
void KafkaHandler::_initProducer(const std::string &servers) {
auto conf= RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string err;
if(conf->set("bootstrap.servers", servers, err) != RdKafka::Conf::CONF_OK){
qFatal("kafka consumer set bootstrap servers error: %s", &err);
}
//创建生产者
_producer = RdKafka::Producer::create(conf,err);
if(!_producer)
{
qFatal("create producer error: %s", &err);
}
}
bool KafkaHandler::message(const QString& topic, const QString& message,const QString& key, RdKafka::Headers *headers, int partition, void * que) {
auto payload = message.toStdString();
std::string k;
if(nullptr != key){
k = key.toStdString();
}
auto code = _producer->produce(topic.toStdString(),
partition,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(payload.c_str()),
payload.size(),
const_cast<char *>(k.c_str()), k.size(),
0,
headers,
que
);
return RdKafka::ERR_NO_ERROR == code;
}
KafkaHandler *KafkaHandler::getIns() {
static KafkaHandler kafkaHandler;
return &kafkaHandler;
}
KafkaHandler::KafkaHandler() = default;
KafkaHandler::~KafkaHandler() = default;