kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考。kafka的安装请参考官方文档。
首先我们需要新建一个maven项目,然后在pom中引用kafka jar包,引用依赖如下:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0</version> </dependency>
我们用的版本是0.8, 下面我们看下生产消息的代码:
package cn.outofmemory.kafka; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * Hello world! * */ public class KafkaProducer { private final Producer<String, String> producer; public final static String TOPIC = "TEST-TOPIC"; private KafkaProducer(){ Properties props = new Properties(); //此处配置的是kafka的端口 props.put("metadata.broker.list", "192.168.193.148:9092"); //配置value的序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); //配置key的序列化类 props.put("key.serializer.class", "kafka.serializer.StringEncoder"); //request.required.acks //0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails). //1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost). //-1, which means that the producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the best durability, we guarantee that no messages will be lost as long as at least one in sync replica remains. props.put("request.required.acks","-1"); producer = new Producer<String, String>(new ProducerConfig(props)); } void produce() { int messageNo = 1000; final int COUNT = 10000; while (messageNo < COUNT) { String key = String.valueOf(messageNo); String data = "hello kafka message " + key; producer.send(new KeyedMessage<String, String>(TOPIC, key ,data)); System.out.println(data); messageNo ++; } } public static void main( String[] args ) { new KafkaProducer().produce(); } }
下面是消费端的代码实现:
package cn.outofmemory.kafka; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; public class KafkaConsumer { private final ConsumerConnector consumer; private KafkaConsumer() { Properties props = new Properties(); //zookeeper 配置 props.put("zookeeper.connect", "192.168.193.148:2181"); //group 代表一个消费组 props.put("group.id", "jd-group"); //zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "smallest"); //序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } void consume() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(KafkaProducer.TOPIC, new Integer(1)); StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder); KafkaStream<String, String> stream = consumerMap.get(KafkaProducer.TOPIC).get(0); ConsumerIterator<String, String> it = stream.iterator(); while (it.hasNext()) System.out.println(it.next().message()); } public static void main(String[] args) { new KafkaConsumer().consume(); } }
注意消费端需要配置成zk的地址,而生产端配置的是kafka的ip和端口。
来自:http://outofmemory.cn/code-snippet/33051/java-kafka-producer-consumer-example
相关推荐
kafka java 生产消费程序 demo 示例 kafka 是吞吐量巨大的一个消息系统,它是用 scala 写的,和普通的消息的生产消费还有所不同,写了个 demo 程序供大家参考。kafka 的安装请参考官方文档。 首先我们需要新建一个 ...
kafka-java-demo 基于java的kafka生产消费者示例。 mvn
使用springboot整合kafka,并进行基于kafka的发布订阅消息队列模式的消息发布与消费测试。
卡夫卡示例SSL代码,阿里云连接demo示例
关于Kafka生产者/消费者/ Kafka Connect / Kafka Streams和KSQl的文章 创建kafka生产者和消费者API Suriya NUS示例Kafka项目: : 在本地运行Kafka群集: 在本地Mahcine上运行的Kafka Handy命令参考链接: ://...
生成应用程序映像 docker build --no-cache -t sample_kafka:0.0.1 . -f docker\Dockerfile 创建主题: bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic...
本项目为平时使用Spark/HBase/Kafka等大数据组件的Demo示例,后续会逐渐丰富; 支持Kerberos/非Kerberos环境下操作HBase/Kafka/HDFS/Spark; 支持Spark Streaming实时读取Kafka; 支持Kafka/HBase相关可配置; 支持...
博客演示 这是一个演示项目,显示了事件源和... 要启动演示应用程序,zookeper和kafka服务器,请运行: docker-compose up -d 您现在应该能够访问REST端点,例如 要停止并卸下容器,请运行: docker-compose down
### Abstract这是一个示例演示项目,演示了两个微服务通过Apache Kafka消息代理以异步方式彼此通信。 这些项目包括两项服务: 用户服务通知服务这些服务是使用Sprint Boot和Java 11构建的。用户服务用户服务处理用户...
新增kafka elk demo配置以及文档 文档结构整理 分离注册中心和配置中心 添加zipkin服务跟踪 20180905 自定义注解实现aop日志 自定义注解实现实体类参数校验 添加mybatis自定生成映射实体类、mapper等 添加全局异常...
kafka / redis / es / mysql 20210313添加netty的demo 20210308添加HBase操作相关的Demo 20210303添加Flink操作的Demo 20210228添加HBase操作的演示添加JVM GC日志操作Demo 20210119添加java相关实例:java8并发包...
demo mysql : jdbc 示例demo kafka : kafka 示例demo elasticsearch : elasticsearch 示例demo mysql-binlog : mysql-binlog 示例 doing模块划分fiflow-ui web页面 sql的执行 和 任务的创建 管理fiflow-web 与前端...
待办事项清单示例应用程序该示例演示了如何使用Spring Boot,JPA,Apache Kafka,ElasticSearch和Eventuate Tram框架开发微服务。问题:自动更新数据并发布事件/消息原子地更新数据(例如,域驱动的设计聚合)并发布...
Presentation-cep-demo 演示中处理的示例是PREZ CEP.pptx文件第11页上介绍的示例 当某人使用其卡进行交易时,我们希望检测出银行欺诈的情况,而在30分钟后的间隔内,另一个距离超过100公里的人将使用同一张卡的信息...
租赁卡车事件驱动的体系结构示例 设置 $ brew install kafka $ brew install cassandra 使用自制软件服务运行 $ brew services start kafka $ brew services start cassandra 有些人在让Homebrew Cassandra正常工作...
kafka-demo 邮件演示 微服务 多数据库 石英演示https://www.opencodez.com/java/quartz-scheduler-with-spring-boot.htm restful-demo ...
这是一个示例项目,概述了消费者驱动的合同测试。 目录 序言 在为任何项目设置连续的部署管道时,制定有针对性的测试计划至关重要。 有效的测试套件,反馈回路短 有效的测试套件包括多种测试策略,这些策略可导致较...
该库还包含一个工具箱(demo_data目录),该工具箱包含用于配置测试Apache Kafka实例并用一些演示数据填充它的脚本。要求可以使用ant ( )和ivy ( )构建系统。快速开始在alpha_miner目录中使用ant命令构建alpha ...
clnr-demo 使用CLNR数据集的数据流应用程序的初始演示。 该演示将展示一些容器,这些容器处理智能电表读数的数据流。 数据是由项目生成的。 最初,该演示将使用TC1a数据集,该数据集每隔30分钟就会读取一次家用智能...
Apache Flink游乐场 ... 该游乐场由Flink集群,Kafka集群和示例Flink作业组成。 Flink文档的中详细介绍了该游乐场。 交互式SQL游乐场仍在开发中,不久将添加。 关于 Apache Flink是Apache软件基金会(ASF