日期:2014-05-16 浏览次数:20967 次
前提:
pom.xml文件如下:
所有kafka依赖的jar包都在com.sksamuel.kafka下面。其中kafka使用的版本是0.8.0-beta1,kafka是2.10。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.iflytek.cpcloud.kafka</groupId> <artifactId>kafkatest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>kafkatest</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>com.sksamuel.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0-beta1</version> </dependency> </dependencies> </project>
package com.iflytek.cpcloud.kafka.kafkatest; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * Test the Kafka Producer * @author jcsong2 * */ public class ProducerTest { public static void main(String[] args) { Properties props = new Properties(); props.put("zk.connect", "192.168.20.99:2181"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "192.168.20.99:9092"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); for (int i = 0; i < 10; i++) producer.send(new KeyedMessage<String, String>("test", "test" + i)); } }在consuemr端可以看到test0到test9十行输出。
package com.iflytek.cpcloud.kafka.kafkatest; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class ConsumerTest extends Thread { private final ConsumerConnector consumer; private final String topic; public static void main(String[] args) { ConsumerTest consumerThread = new ConsumerTest("test"); consumerThread.start(); } public ConsumerTest(String topic) { consumer = kafka.consumer.Co