您现在的位置是: 首页 > Storm整合kafka进行数据消费


Storm整合kafka进行数据消费

纯真年代 2018-11-18 16:32:25 0 848人围观

Storm整合redis官方直达车

该文示例代码直达车

简介

    Storm整合Kafka官方提供了两种方式,就是Storm作为生产者(KafkaBolt)或消费者(KafkaSpout)。

    使用KafkaBolt注意1:TupleToKafkaMapper源码中的方法,作用是将元组映射到Kafka的key和message,我们可以实现它实现自己的字段,官方实现了一个FieldNameBasedTupleToKafkaMapper默认实现,它从上游获取的字段模式是key,message。注意2:KafkaTopicSelector源码中的方法,作用是我们要使用kafka的topic,我们可以实现它,官方提供了一个DefaultTopicSelector类,我们直接可以使用它。注意3:使用withProducerProperties来设置kafka生产者相关的属性。


   使用KafkaSpout:利用KafkaSpoutConfig直接构建相关配置(具体参数解释见官方),最后加到KafkaSpout里面就行了,就向这样

final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");

注意1:我们下游的bolt消费kafkaSpout数据,发送过来的字段是 “topic”,“partition”,“offset”,“key”,“value”字段,具体源码在:org.apache.storm.kafka.spout.DefaultRecordTranslator,注意2:我们下游bolt中处理数据之后,成功要调用ack,失败调用fail,不然数据处理超时后KafkaSpout认为失败会重新发送数据过来(导致数据重复消费)。


示例

1、生产者,部分代码

public class StormKafkaProducerTopo {
public static void main(String[] args) {

TopologyBuilder builder = new TopologyBuilder();

ConsumerSpout consumerSpout = new ConsumerSpout();
builder.setSpout("consumerSpout", consumerSpout);

//设置生产者相关参数,详情kafka官网
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.31.140:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("storm_kafka_test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("kafkaBolt", kafkaBolt).shuffleGrouping("consumerSpout");

//本地测试
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("kafkaboltTest", new Config(), builder.createTopology());

}
}

效果:



2、消费者,部分代码

public class StormKafkaTopo {

public static void main(String[] args) {

final TopologyBuilder topologyBuilder = new TopologyBuilder();

//包含“topic”,“partition”,“offset”,“key”,“value”字段。
String host = "192.168.31.140:9092";
String topic = "storm_kafka_test";

KafkaSpoutConfig<String, String> kafkaSpoutConfig = KafkaSpoutConfig.builder(host, topic).build();

topologyBuilder.setSpout("kafka_spout", new KafkaSpout<>(kafkaSpoutConfig));
topologyBuilder.setBolt("my_bolt", new myBolt()).shuffleGrouping("kafka_spout");

//本地测试
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("LocalWordCountJdbcApp", new Config(), topologyBuilder.createTopology());
}
}

效果:




上一篇: Elasticsearch入门指南

下一篇: Storm整合jdbc

全部评论