日期:2014-05-16 浏览次数:21096 次
前提:
pom.xml文件如下:
所有kafka依赖的jar包都在com.sksamuel.kafka下面。其中kafka使用的版本是0.8.0-beta1,kafka是2.10。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.iflytek.cpcloud.kafka</groupId>
  <artifactId>kafkatest</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  <name>kafkatest</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>
  <dependencies>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.14</version>
		</dependency>
		<dependency>
			<groupId>com.sksamuel.kafka</groupId>
			<artifactId>kafka_2.10</artifactId>
			<version>0.8.0-beta1</version>
		</dependency>
	</dependencies>
</project>
package com.iflytek.cpcloud.kafka.kafkatest;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
 * Test the Kafka Producer
 * @author jcsong2
 *
 */
public class ProducerTest {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("zk.connect", "192.168.20.99:2181");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		props.put("metadata.broker.list", "192.168.20.99:9092");
		ProducerConfig config = new ProducerConfig(props);
		Producer<String, String> producer = new Producer<String, String>(config);
		for (int i = 0; i < 10; i++)
			producer.send(new KeyedMessage<String, String>("test", "test" + i));
	}
}
在consuemr端可以看到test0到test9十行输出。
package com.iflytek.cpcloud.kafka.kafkatest;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class ConsumerTest extends Thread {
	private final ConsumerConnector consumer;
	private final String topic;
	public static void main(String[] args) {
		ConsumerTest consumerThread = new ConsumerTest("test");
		consumerThread.start();
	}
	public ConsumerTest(String topic) {
		consumer = kafka.consumer.Co