分布式日志系统01-构建kafka模块

启动zk和kafka:

1
2
3
4
5
6
root@SerenaLina:~/kafka_2.13-2.8.0/bin# sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
root@SerenaLina:~/kafka_2.13-2.8.0/bin# lsof -i:2181
COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
java 3373 root 129u IPv6 39319 0t0 TCP *:2181 (LISTEN)
root@SerenaLina:~/kafka_2.13-2.8.0/bin# sh kafka-server-start.sh -daemon ../config/server.properties
root@SerenaLina:~/kafka_2.13-2.8.0/bin#

创建一个kafka生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// kafkaddr = "localhost:9092"使用给定代理地址和配置创建一个同步生产者
func CreateKafkaProducer(kafkaddr string) (sarama.SyncProducer, error) {
    config := sarama.NewConfig()
    // 等待服务器所有副本都保存成功后的响应

    config.Producer.RequiredAcks = sarama.WaitForAll

    // 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区

    config.Producer.Partitioner = sarama.NewRandomPartitioner

    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer([]string{kafkaddr}, config)

    if err != nil {

        fmt.Println("create producer failed, ", err.Error())

        return nil, err

    }

    fmt.Println("create kafka producer success")


    return producer, nil

}

创建一个kafka消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// var kafkaddr = "localhost:9092"

func CreateKafkaConsumer(kafkaddr string) (sarama.Consumer, error) {

    config := sarama.NewConfig()

    config.Consumer.Return.Errors = true


    //创建消费者

    consumer, err := sarama.NewConsumer([]string{kafkaddr}, config)

    return consumer, err

}

构建生产者发送的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (p *KafkaProducer) SendMessage(topic string, valstr string) {
    //构建发送的消息,
    msg := &sarama.ProducerMessage{

        Topic: topic, // 指定topic

        Key:   sarama.StringEncoder(topic),

        Value: sarama.StringEncoder(valstr),

    }

    partition, offset, err := p.Producer.SendMessage(msg)

    if err != nil {

        fmt.Println("Send message Fail")

        fmt.Println(err.Error())

    }

    fmt.Printf("SendMessage -> Partition = %d, offset=%d, msgvalue=%s \n", partition, offset, valstr)

}

topic指定为test

使消费者不断从消息队列中读取信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// var kafkaddr = "localhost:9092"


func (c *KafkaConsumer) GetMessage(topic string) {

    //Partitions(topic):该方法返回了该topic的所有分区id

    partitionList, err := c.Consumer.Partitions(topic)

    if err != nil {

        panic(err)

    }

    fmt.Println("topic:", topic, ", partitionList:", partitionList)



    var wg sync.WaitGroup

    for partition := range partitionList {

        //ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者

        //如果该分区消费者已经消费了该信息将会返回error

        //sarama.OffsetNewest:表明了为最新消息

        pc, err := c.Consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest)

        if err != nil {

            panic(err)

        }

        defer pc.AsyncClose()

        wg.Add(1)

        go func(sarama.PartitionConsumer) {

            defer wg.Done()

            //Messages()该方法返回一个消费消息类型的只读通道,由代理产生

            for msg := range pc.Messages() {

                fmt.Printf("GetMessage -> %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic, msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))

            }

        }(pc)
    }
    wg.Wait()

}

上次更新 2025-11-06