|
Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),有如下特性:
相關(guān)術(shù)語介紹
在這里我們用了一個(gè)第三方庫叫Confluent.kafka,在nuget上搜索一下就出來了,感謝原作者。 新建一個(gè) .net core類庫項(xiàng)目安裝第三方依賴庫,如下圖所示: 新建一個(gè)SUPKafkaTopicConsumer類這是用來創(chuàng)建并初始化消費(fèi)者,接下來看看這個(gè)類里面包含了什么。
public delegate void OnReceivedHandle(object data); 初始化消費(fèi)者,構(gòu)造函數(shù)中傳入kafka地址,以及要訂閱的組groupId,另外注入了log4net記錄日志信息。 public class SUPKafkaTopicConsumer<TKey, TValue>
{
private IConsumer<TKey, TValue> consumer;
private SUPLogger logger_;
private string BootStrapServer;
private string GroupId;
public SUPKafkaTopicConsumer(string bootStrapServer, string groupId, SUPLogger logger = null)
{
BootStrapServer = bootStrapServer;
GroupId = groupId;
logger_ = logger;
}
public bool Init()
{
try
{
var conf = new ConsumerConfig
{
GroupId = GroupId,
BootstrapServers = BootStrapServer,
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false // 設(shè)置非自動(dòng)偏移,業(yè)務(wù)邏輯完成后手動(dòng)處理偏移,防止數(shù)據(jù)丟失
};
consumer = new ConsumerBuilder<TKey, TValue>(conf)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
return true;
}
catch (Exception ex)
{
throw;
}
}
public event OnReceivedHandle onReceivedHandle;
public void Subscribe(string topic, bool isCommit)
{
try
{
if (consumer != null)
{
consumer.Subscribe(topic);
while (true)
{
var consume = consumer.Consume();
if (onReceivedHandle != null)
{
onReceivedHandle(consume);
if (isCommit)
{
consumer.Commit(consume);
}
}
}
}
}
catch (Exception ex)
{
//consumer.Close();
throw ex;
}
}
public void UnSubscribe()
{
if (consumer != null)
{
consumer.Unsubscribe();
}
}新建生產(chǎn)者類
public interface ISUPKafkaProducer<Tkey,TValue>
{
ISendResult Send(Tkey key, TValue value, string topic,Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null);
ISendResult AsyncSend(Tkey key, TValue value,string topic);
ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition);
}
internal class SUPKafkaTopicProducer<Tkey, TValue> : ISUPKafkaProducer<Tkey, TValue>
{
private IProducer<Tkey, TValue> producer;
private SUPLogger logger_;
private string m_bootStrapServer;
public SUPKafkaTopicProducer(string bootStrapServer,SUPLogger logger = null)
{
m_bootStrapServer = bootStrapServer;
logger_ = logger;
}
public bool Init()
{
try
{
var config = new ProducerConfig
{
BootstrapServers = m_bootStrapServer
};
producer = new ProducerBuilder<Tkey, TValue>(config)
.SetErrorHandler((producer, error) =>
{
logger_.Fatal(string.Format("Kafka Error Handler {0},ErrorCode:{2},Reason:{3}",
m_bootStrapServer, error.Code, error.Reason));
})
.SetLogHandler((producer, msg) =>
{
logger_.Info(string.Format("Kafka Log Handler {0}-{1},Name:{2},Message:{3}",
m_bootStrapServer, msg.Name, msg.Message));
})
.Build();
return true;
}
catch (Exception ex)
{
throw ex;
}
}實(shí)現(xiàn)繼承至ISUPKafkaProducer<Tkey, TValue>的方法 public ISendResult Send(Tkey key, TValue value,string topic, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
{
try
{
if (producer != null)
{
var message = new Message<Tkey, TValue>
{
Value = value,
Key = key
};
producer.Produce(topic, message, sendCallBack);
return new SendResult(true);
}
else
{
return new SendResult(true, "沒有初始化生產(chǎn)者");
}
}
catch (Exception ex)
{
throw ex;
}
}
public ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null)
{
try
{
if (producer != null)
{
var message = new Message<Tkey, TValue>
{
Value = value,
Key = key
};
producer.Produce(topicPartition, message, sendCallBack);
return new SendResult(true);
}
else
{
return new SendResult(true, "沒有初始化生產(chǎn)者");
}
}
catch (Exception ex)
{
throw ex;
}
}
public ISendResult AsyncSend(Tkey key, TValue value,string topic)
{
try
{
if (producer != null)
{
var message = new Message<Tkey, TValue>
{
Value = value,
Key = key
};
var deliveryReport = producer.ProduceAsync(topic, message);
deliveryReport.ContinueWith(task =>
{
Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
});
producer.Flush(TimeSpan.FromSeconds(10));
return new SendResult(true);
}
else
{
return new SendResult(true, "沒有初始化生產(chǎn)者");
}
}
catch (Exception ex)
{
throw ex;
}
}
public ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition)
{
try
{
if (producer != null)
{
var message = new Message<Tkey, TValue>
{
Value = value,
Key = key
};
var deliveryReport = producer.ProduceAsync(topicPartition, message);
deliveryReport.ContinueWith(task =>
{
Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topicPartition.Topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset);
});
producer.Flush(TimeSpan.FromSeconds(10));
return new SendResult(true);
}
else
{
return new SendResult(true, "沒有初始化生產(chǎn)者");
}
}
catch (Exception ex)
{
throw ex;
}
}新建一個(gè)SUPKafkaMessageCenter類這個(gè)類是對外開放的,我們利用這個(gè)類來管理生產(chǎn)者和消費(fèi)者,看下代碼非常簡單。 public static class SUPKafkaMessageCenter<Tkey, TValue>
{
private static SUPLogger logger = null;
static SUPKafkaMessageCenter()
{
SUPLoggerManager.Configure();
logger = new SUPLogger("KafkaCenter");
}
/// <summary>
/// 創(chuàng)建生產(chǎn)者
/// </summary>
/// <param name="bootstrapServer"></param>
/// <param name="topicName"></param>
/// <returns></returns>
public static ISUPKafkaProducer<Tkey, TValue> CreateTopicProducer(string bootstrapServer)
{
if (string.IsNullOrEmpty(bootstrapServer))
{
return null;
}
var producer = new SUPKafkaTopicProducer<Tkey, TValue>(bootstrapServer, logger);
if (!producer.Init())
{
return null;
}
return producer;
}
/// <summary>
/// 創(chuàng)建消費(fèi)者
/// </summary>
/// <param name="bootstrapServer"></param>
/// <param name="groupId"></param>
/// <returns></returns>
public static SUPKafkaTopicConsumer<Tkey, TValue> CreateTopicConsumer(string bootstrapServer, string groupId= "default-consumer-group")
{
if (string.IsNullOrEmpty(bootstrapServer))
{
return null;
}
var consumer = new SUPKafkaTopicConsumer<Tkey, TValue>(bootstrapServer, groupId,logger);
if (!consumer.Init())
{
return null;
}
return consumer;
}測試新建一個(gè)測試的控制臺程序,調(diào)用代碼如下
var consumer = SUPKafkaMessageCenter<string, string>.CreateTopicConsumer("localhost:9092");
//綁定接收信息,回調(diào)函數(shù)
consumer.onReceivedHandle += CallBack;
var topics = new List<string>();
topics.Add("kafka-default-topic");
topics.Add("test");
//訂閱主題
consumer.Subscribe(topics, false);
ISUPKafkaProducer<string, string> kafkaCenter = SUPKafkaMessageCenter<string, string>.CreateTopicProducer("localhost:9092");
kafkaCenter.Send(i.ToString(), "", "kafka-default-topic",deliveryReport =>{...});除了上面寫的這些方法,其實(shí)對于kafka還有很多功能,比如topic的增刪改查,我把它認(rèn)為是管理類的,這里就不貼代碼了。 |
|
|