您的位置:首页 > 房产 > 家装 > kafka-生产者发送消息消费者消费消息

kafka-生产者发送消息消费者消费消息

2025/5/7 5:27:44 来源:https://blog.csdn.net/m0_65152767/article/details/139260541  浏览:    关键词:kafka-生产者发送消息消费者消费消息

文章目录

  • 1、生产者发送消息&消费者消费消息
    • 1.1、获取 kafka-console-producer.sh 的帮助信息
    • 1.2、生产者发送消息到某个主题
    • 1.3、消费主题数据

1、生产者发送消息&消费者消费消息

1.1、获取 kafka-console-producer.sh 的帮助信息

[root@localhost ~]# kafka-console-producer.sh --help
This tool helps to read data from standard input and publish it to Kafka.
Option                                   Description                            
------                                   -----------                            
--batch-size <Integer: size>             Number of messages to send in a single batch if they are not being sent     synchronously. (default: 200)        
--bootstrap-server <String: server to    REQUIRED unless --broker-list          connect to>                              (deprecated) is specified. The server(s) to connect to. The broker list   string in the form HOST1:PORT1,HOST2:PORT2.                               
--broker-list <String: broker-list>      DEPRECATED, use --bootstrap-server     instead; ignored if --bootstrap-     server is specified.  The broker     list string in the form HOST1:PORT1, HOST2:PORT2.                         
--compression-codec [String:             The compression codec: either 'none',  compression-codec]                       'gzip', 'snappy', 'lz4', or 'zstd'.  If specified without value, then it  defaults to 'gzip'                   
--help                                   Print usage information.               
--line-reader <String: reader_class>     The class name of the class to use for reading lines from standard in. By   default each line is read as a       separate message. (default: kafka.   tools.                               ConsoleProducer$LineMessageReader)   
--max-block-ms <Long: max block on       The max time that the producer will    send>                                    block for during a send request      (default: 60000)                     
--max-memory-bytes <Long: total memory   The total memory used by the producer  in bytes>                                to buffer records waiting to be sent to the server. (default: 33554432)   
--max-partition-memory-bytes <Long:      The buffer size allocated for a        memory in bytes per partition>           partition. When records are received which are smaller than this size the producer will attempt to             optimistically group them together   until this size is reached.          (default: 16384)                     
--message-send-max-retries <Integer>     Brokers can fail receiving the message for multiple reasons, and being      unavailable transiently is just one  of them. This property specifies the number of retries before the         producer give up and drop this       message. (default: 3)                
--metadata-expiry-ms <Long: metadata     The period of time in milliseconds     expiration interval>                     after which we force a refresh of    metadata even if we haven't seen any leadership changes. (default: 300000)
--producer-property <String:             A mechanism to pass user-defined       producer_prop>                           properties in the form key=value to  the producer.                        
--producer.config <String: config file>  Producer config properties file. Note  that [producer-property] takes       precedence over this config.         
--property <String: prop>                A mechanism to pass user-defined       properties in the form key=value to  the message reader. This allows      custom configuration for a user-     defined message reader. Default      properties include:                  parse.key=true|false                  key.separator=<key.separator>         ignore.error=true|false               
--request-required-acks <String:         The required acks of the producer      request required acks>                   requests (default: 1)                
--request-timeout-ms <Integer: request   The ack timeout of the producer        timeout ms>                              requests. Value must be non-negative and non-zero (default: 1500)         
--retry-backoff-ms <Integer>             Before each retry, the producer        refreshes the metadata of relevant   topics. Since leader election takes  a bit of time, this property         specifies the amount of time that    the producer waits before refreshing the metadata. (default: 100)         
--socket-buffer-size <Integer: size>     The size of the tcp RECV size.         (default: 102400)                    
--sync                                   If set message send requests to the    brokers are synchronously, one at a  time as they arrive.                 
--timeout <Integer: timeout_ms>          If set and the producer is running in  asynchronous mode, this gives the    maximum amount of time a message     will queue awaiting sufficient batch size. The value is given in ms.      (default: 1000)                      
--topic <String: topic>                  REQUIRED: The topic id to produce      messages to.                         
--version                                Display Kafka version.

1.2、生产者发送消息到某个主题

[root@localhost ~]# kafka-console-producer.sh --bootstrap-server 192.168.74.148:9092 --topic my_topic1
>1
>2
>hello
>3
>4
>5
>a
>b
>c
>

1.3、消费主题数据

默认从 leo (下次写入消息的位置)log end offset 的位置开始消费,想要从主题最开始的位置消费消息可以加上 --from-beginning

[root@localhost ~]# kafka-console-consumer.sh --bootstrap-server 192.168.74.148:9092 --topic my_topic1 --from-beginning
3
5
b
1
hello
a
c
2
4

在这里插入图片描述

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com