You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

163 lines
5.1 KiB

1 year ago
#include "kafka_handler.h"
#include <QVector>
bool KafkaHandler::_run = false;
RdKafka::Producer* KafkaHandler::_producer = nullptr;
void KafkaHandler::init() {
auto conf = KafkaConfig::getIns();
auto consumerServers = conf->getConsumerServers().join(",").toStdString();
auto consumerTopics = conf->getConsumerTopics();
auto producer = conf->getProducerServers().join(",").toStdString();
if(consumerTopics.empty()){
qFatal("kafka consumer topic can`t empty ");
}
// 初始化 consumer
_run = true;
_initConsumer(consumerServers, consumerTopics);
// 实例化 producer
_initProducer(producer);
}
void KafkaHandler::_initConsumer(const std::string& servers, const QStringList& topics) {
auto conf=RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string err;
if(conf->set("bootstrap.servers", servers, err) != RdKafka::Conf::CONF_OK){
qFatal("kafka consumer set bootstrap servers error: %s", &err);
}
if(conf->set("group.id",topics.join(",").toStdString(),err) != RdKafka::Conf::CONF_OK){
qFatal("kafka consumer set group id error: %s", &err);
}
//创建消费者
auto consumer= RdKafka::KafkaConsumer::create(conf,err);
if (!consumer) {
qFatal("create consumer error: %s", &err);
}
QVector<std::string> vector;
for (const auto &item: topics.toVector()){
vector.push_back(item.toStdString());
}
consumer->subscribe(vector.toStdVector());
QtConcurrent::run(_consumerStart, consumer);
}
void KafkaHandler::_consumerStart(RdKafka::KafkaConsumer * consumer) {
while (_run){
RdKafka::Message *msg = consumer->consume(100);
_receive(msg);
}
consumer->close();
delete consumer;
}
void KafkaHandler::_receive(RdKafka::Message *message) {
switch (message->err())
{
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
{
// TODO key 校验 message->key()
QString key;
if(message->key()){
key = QString::fromStdString(*message->key());
qDebug() << "receive message key:" << key;
}
// TODO header 校验 message->headers()
auto headers = message->headers();
if (headers) {
std::vector<RdKafka::Headers::Header> hdrs = headers->get_all();
for (size_t i = 0; i < hdrs.size(); i++) {
const RdKafka::Headers::Header hdr = hdrs[i];
if (hdr.value() != nullptr){
std::string value (static_cast<const char*>(hdr.value()), hdr.value_size());
qDebug() << "receive message headers: " << QString::fromStdString(hdr.key())
<< "-" << QString::fromStdString(value);
}
}
}
std::string payload (static_cast<const char*>(message->payload()), message->len());
auto msg = QString::fromStdString(payload);
qDebug() << "receive message payload:" << msg;
emit getIns()->handle(key, msg);
}
break;
case RdKafka::ERR__PARTITION_EOF:
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
qCritical("consume failed: %s", qPrintable(QString::fromStdString(message->errstr())));
_run = false;
break;
default:
qWarning("consume failed: %s" , qPrintable(QString::fromStdString(message->errstr())));
_run = false;
break;
}
}
void KafkaHandler::stop() {
_run = false;
}
void KafkaHandler::_initProducer(const std::string &servers) {
auto conf= RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
std::string err;
if(conf->set("bootstrap.servers", servers, err) != RdKafka::Conf::CONF_OK){
qFatal("kafka consumer set bootstrap servers error: %s", &err);
}
//创建生产者
_producer = RdKafka::Producer::create(conf,err);
if(!_producer)
{
qFatal("create producer error: %s", &err);
}
}
bool KafkaHandler::message(const QString& topic, const QString& message,const QString& key, RdKafka::Headers *headers, int partition, void * que) {
auto payload = message.toStdString();
std::string k;
if(nullptr != key){
k = key.toStdString();
}
auto code = _producer->produce(topic.toStdString(),
partition,
RdKafka::Producer::RK_MSG_COPY,
const_cast<char *>(payload.c_str()),
payload.size(),
const_cast<char *>(k.c_str()), k.size(),
0,
headers,
que
);
return RdKafka::ERR_NO_ERROR == code;
}
KafkaHandler *KafkaHandler::getIns() {
static KafkaHandler kafkaHandler;
return &kafkaHandler;
}
KafkaHandler::KafkaHandler() = default;
KafkaHandler::~KafkaHandler() = default;