(资料图)
配置完Kafka集群后,下面通过Java API的方式来操作 需要导入的Jar包
kafka_2.10-0.8.1.1.jar log4j-1.2.15.jar metrics-core-2.2.0.jar scala-library-2.10.1.jar slf4j-api-1.7.2.jar
以上jar包均可从Kafka的发布包中找的到,在lib目录下面
生产者(Producers)
代码:
import java.util.*; import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;public class TestProducer { public static void main(String[] args) { long events = Long.parseLong(args[0]); Random rnd = new Random(); //在以下属性中定义了Producer如何找到集群,序列化消息等 Properties props = new Properties(); props.put("metadata.broker.list", "broker1:9092,broker2:9092 "); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("partitioner.class", "example.producer.SimplePartitioner"); props.put("request.required.acks", "1"); ProducerConfig config = new ProducerConfig(props); //定义生产者对象,该类指定了两个参数的泛型,第一个参数表示分区键值的类型,第二参数表示消息类型 Producerproducer = new Producer(config); for (long nEvents = 0; nEvents < events; nEvents++) { long runtime = new Date().getTime(); String ip = “192.168.2.” + rnd.nextInt(255); String msg = runtime + “,www.example.com,” + ip; //发送消息到消息中介,test指定要接受消息的主题。 KeyedMessagedata = new KeyedMessage("test", ip, msg); //执行发送 producer.send(data); } producer.close(); }}
Producer配置参数:
metadata.broker.list:定义一个或者多个消息中介(broker),Produder通过broker决定主题leader的位置。这里无需配置所有的broker,但建议配置多于一个。 serializer.class:定义准备传递数据给broker时使用哪个序列化器。 partitioner.class:这个是可选项,该类将决定消息将发送到哪个主题分区上。 request.required.acks:该值设置为1后,broker收到消息后将发送一个确认信息给producer。
在上述程序运行之前请确保Kafka已经存在名称为test的主题,如果没有可以使用下面命令创建 bin/kafka-create-topic.sh --topic test --replica 3--zookeeper localhost:2181--partition 5然后使用下面命令查看: bin/kafka-console-consumer.sh --zookeeper localhost:2181--topic test --from-beginning【参考】: https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example