博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HttpServer发送数据到kafka
阅读量:6341 次
发布时间:2019-06-22

本文共 17450 字,大约阅读时间需要 58 分钟。

文件夹

1、需求

2、框架结构图和步鄹图

3、代码结构

4、代码展现

———————————————————————-

1、需求

1.1、解析路径,将路径的最后一个字符串作为Appkey;

1.2、数据缓存。当Kafka无法正常訪问时在本地Cache文件夹缓存数据;
1.3、安全验证,对请求的appkey进行合法性验证(签名验证待定);
1.4、自己主动更新appkey列表。每间隔一段时间获取一次最新的appkey列表;
1.5、添加ip字段,给每份数据添加一个ip字段;
1.6、记录日志,记录主要的统计信息日志。以及异常错误信息。

2、框架结构图和步鄹图

这里写图片描写叙述

这里写图片描写叙述

3、代码结构

这里写图片描写叙述

4、代码展现

Configuration.java

package com.donews.data;import com.typesafe.config.Config;import com.typesafe.config.ConfigFactory;/** * Created by yuhui on 16-6-23. */public class Configuration {
public static final Config conf= ConfigFactory.load();}

Counter.java

package com.donews.data;import io.vertx.core.Vertx;import io.vertx.core.logging.Logger;import io.vertx.core.logging.LoggerFactory;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicLong;/** * Created by yuhui on 16-6-22. */public class Counter {
private Logger LOG = LoggerFactory.getLogger(Counter.class); AtomicLong messages = new AtomicLong(0L); AtomicLong bytes = new AtomicLong(0L); private long start = System.currentTimeMillis(); private void reset() { messages.set(0L); bytes.set(0L); start = System.currentTimeMillis(); } /*** * 标记时间的方法 二月 14, 2017 3:49:53 下午 com.donews.data.Counter 信息: start Counter 二月 14, 2017 3:49:54 下午 com.donews.data.Counter 信息: start Counter 二月 14, 2017 3:49:55 下午 com.donews.data.Counter 信息: start Counter 二月 14, 2017 3:49:56 下午 com.donews.data.Counter 信息: start Counter * @param vertx */ public void start(Vertx vertx) { LOG.info("start Counter"); long delay = Configuration.conf.getDuration("server.counter.delay", TimeUnit.MILLISECONDS); vertx.setPeriodic(delay, h -> { long time = System.currentTimeMillis() - start; double rps = messages.get() * 1000.0 / time; double mbps = (bytes.get() * 1000.0 / 1024.0 / 1024.0) / time; Runtime runtime = Runtime.getRuntime(); double totalMem = runtime.totalMemory() * 1.0 / 1024 / 1024; double maxMem = runtime.maxMemory() * 1.0 / 1024 / 1024; double freeMem = runtime.freeMemory() * 1.0 / 1024 / 1024; LOG.info("{0}:Message/S, {1}:MBytes/S", rps, mbps); LOG.info("totalMem:{0}MB maxMem:{1}MB freeMem:{2}MB", totalMem, maxMem, freeMem); reset(); }); }}

KafkaHttpServer.java

package com.donews.data;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import io.vertx.core.Vertx;import io.vertx.core.http.HttpServer;import io.vertx.core.http.HttpServerResponse;import io.vertx.core.json.JsonArray;import io.vertx.core.json.JsonObject;import io.vertx.core.logging.Logger;import io.vertx.core.logging.LoggerFactory;import io.vertx.ext.web.Router;import io.vertx.ext.web.RoutingContext;import io.vertx.ext.web.handler.BodyHandler;import java.io.*;import java.sql.*;import java.time.Instant;import java.util.HashSet;import java.util.Set;import java.util.Timer;import java.util.TimerTask;import java.util.concurrent.atomic.AtomicInteger;public class KafkaHttpServer {    private static final Logger LOG = LoggerFactory.getLogger(KafkaHttpServer.class);    private final Counter statistic = new Counter();    private static final String DBDRIVER = "com.mysql.jdbc.Driver";    private static final String URL = Configuration.conf.getString("mysql.url");    private static final String USER = Configuration.conf.getString("mysql.user");    private static final String PASSWORD = Configuration.conf.getString("mysql.password");    private static HashSet
appkeys = new HashSet<>(); private static boolean deleteFile = true; private void error(HttpServerResponse response, String message) { response.setStatusCode(500).end(new JsonObject() .put("code", 3) .put("msg", message) .encode()); } private void ok(HttpServerResponse response, String message) { response.putHeader("Access-Control-Allow-Origin", "*"); response.setStatusCode(200).end(new JsonObject() .put("code", 0) .put("msg", message) .encode()); } private void startService(int port) { KafkaProducerWrapper sender = new KafkaProducerWrapper(); Vertx vertx = Vertx.vertx(); HttpServer server = vertx.createHttpServer(); Router router = Router.router(vertx); router.route().handler(BodyHandler.create()); //post请求:http://192.168.1.10:10002/mininfo/logs //router.route 这里是路由 ,/mininfo/logs相似于路由房间 router.route("/mininfo/logs").handler(ctx -> { try { JsonArray array = ctx.getBodyAsJsonArray(); String[] messages = new String[array.size()]; for (int i = 0; i < array.size(); i++) {
JsonObject message = array.getJsonObject(i); message.put("ip", ctx.request().remoteAddress().host()); if (!message.containsKey("timestamp")) { message.put("timestamp", Instant.now().toString()); } messages[i] = array.getJsonObject(i).encode(); } sendMessages(sender, ctx, "appstatistic_production", messages); } catch (Exception e) { error(ctx.response(), e.getMessage()); } }); router.routeWithRegex("/mininfo/v1/logs/[^/]+").handler(routingContext -> { String path = routingContext.request().path(); String topic = path.substring(path.lastIndexOf("/") + 1); LOG.info("如今处理的topic(appkey)为:" + topic); if (appkeys.contains(topic)) { LOG.info("经过验证,该topic(appkey)有效"); String[] messages = routingContext.getBodyAsString().split("\n"); //用于运行堵塞任务(有序运行和无序运行),默认顺序运行提交的堵塞任务 vertx.executeBlocking(future -> { sendMessages(sender, routingContext, topic, messages); future.complete(); }, result -> { }); } else { LOG.info("您的topic(appkey)还没有配置,请在mysql中配置先"); error(routingContext.response(), "please configurate " + topic + "(appkey) in Mysql first! After 10mins it`ll take action"); } }); router.route("/mininfo/v1/ip").handler(ctx -> { LOG.info("x-real-for" + ctx.request().getHeader("x-real-for")); LOG.info("x-forwarded-for" + ctx.request().getHeader("x-forwarded-for")); ok(ctx.response(), ctx.request().getHeader("x-forwarded-for")); }); router.route("/*").handler(ctx -> error(ctx.response(), "wrong! check your path...")); server.requestHandler(router::accept).listen(port, result -> { if (result.succeeded()) { LOG.info("listen on port:{0}", String.valueOf(port)); this.statistic.start(vertx); } else { LOG.error(result.cause()); vertx.close(); } }); //假设你须要在你的程序关闭前採取什么措施。那么关闭钩子(shutdown hook)是非常实用的,相似finally Runtime.getRuntime().addShutdownHook(new Thread(sender::close)); } private void sendMessages(KafkaProducerWrapper sender, RoutingContext ctx, String topic, String[] messages) { AtomicInteger counter = new AtomicInteger(0); for (String message : messages) { if (message == null || "".equals(message)) { ok(ctx.response(), "Success"); continue; } //将ip添加到数据的ip字段 JSONObject jsonObject = JSON.parseObject(message); if (jsonObject.get("ip") == null) { LOG.info("正在添加ip字段"); String ip; String header = ctx.request().getHeader("x-forwarded-for"); if (!(header == null || header.trim().length() == 0 || header.trim().equals("null"))) { ip = header.split(",")[0]; } else { ip = ctx.request().remoteAddress().host(); } jsonObject.put("ip", ip); LOG.info("ip添加成功"); } //topic, message, callback,以匿名函数的形式实现接口中的onCompletion函数 sender.send(topic, jsonObject.toString(), (metadata, exception) -> { if (exception != null) { LOG.warn(exception); String msg = new JsonObject() .put("error", exception.getMessage()) .put("commit", counter.get()) .encode(); error(ctx.response(), msg); cacheLocal(jsonObject.toString(), "/home/lihui/httpkafka/data_bak/" + topic + ".txt"); LOG.info("连接kafka失败,写入cache缓存文件夹以备份数据"); } else { statistic.messages.incrementAndGet(); // Counter statistic.bytes.addAndGet(message.length()); if (counter.incrementAndGet() == messages.length) { ok(ctx.response(), "Success"); } } }); } } /** * 将发送到kafka失败的消息缓存到本地 * * @param message message * @param cachePath cachePath */ private void cacheLocal(String message, String cachePath) { try { FileWriter fileWriter = new FileWriter(cachePath, true); BufferedWriter bw = new BufferedWriter(fileWriter); bw.write(message); bw.newLine(); bw.flush(); bw.close(); } catch (IOException e) { e.printStackTrace(); } } /** * 发送缓存数据到kafka,发送成功,删除缓存数据。失败过10分钟重试 * * @param path 保存缓存数据的[文件夹] */ private static void sendToKafka(String path) { String message; KafkaProducerWrapper sender = new KafkaProducerWrapper(); File file = new File(path); if (file.isDirectory()) { String[] fileList = file.list(); if (fileList != null && fileList.length != 0) { LOG.info("正在将缓存文件夹中的备份数据发送到kafka中..."); for (String str : fileList) { String topic = str.split("\\.")[0]; try { BufferedReader reader = new BufferedReader(new FileReader(path + str)); while ((message = reader.readLine()) != null) { sender.send(topic, message, (metadata, exception) -> { if (metadata != null) { LOG.info("缓存的备份数据正在一条一条的插入kafka中"); } else { //程序错误又一次运行// exception.printStackTrace(); LOG.error("kafka连接异常为:===> 10分钟后会自己主动重试," + exception.getMessage(), exception); deleteFile = false; } }); } if (deleteFile) { LOG.info("開始删除已经插入到kafka中的缓存备份数据"); deleteFile(path, topic); LOG.info("删除完成。"); } reader.close(); } catch (IOException e) { e.printStackTrace(); } } } else { LOG.info("缓存文件夹中没有备份文件"); } } } private static void deleteFile(String path, String appkey) { String appkeyPath = path + "/" + appkey + ".txt"; File file = new File(appkeyPath); file.delete(); LOG.info("成功删除appkey为" + appkey + "的缓存数据"); } private static Set
getAppkeys() { Set
appkeys = new HashSet<>(); String sql = "select appkey from service_config_yarn_properties_table"; try { Class.forName(DBDRIVER); Connection conn = DriverManager.getConnection(URL, USER, PASSWORD); PreparedStatement ps = conn.prepareStatement(sql); ResultSet rs = ps.executeQuery(); while (rs.next()) { appkeys.add(rs.getString(1)); } rs.close(); conn.close(); } catch (ClassNotFoundException | SQLException e) { e.printStackTrace(); } return appkeys; } public static void main(String[] args) throws Exception { Timer timer = new Timer(); //1、10十分钟检查cache文件夹是否有数据,2、同步数据库的APPKEY,做安全验证 timer.schedule(new TimerTask() { @Override public void run() { appkeys.addAll(getAppkeys()); LOG.info("同步完数据库中的appkey(每隔十分钟)"); sendToKafka("/home/lihui/httpkafka/data_bak/");// sendToKafka("C:\\Dell\\UpdatePackage\\log"); } }, 0L, 10 * 60 * 1000L); //主线程 try { int port = Configuration.conf.getInt("server.port"); KafkaHttpServer front = new KafkaHttpServer(); front.startService(port); } catch (Exception e) { e.printStackTrace(); } }}

KafkaProducerWrapper.java

package com.donews.data;import com.typesafe.config.Config;import io.vertx.core.logging.Logger;import io.vertx.core.logging.LoggerFactory;import org.apache.kafka.clients.producer.Callback;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/** * Created by yuhui on 16-6-22. * * kafka的生产。通过send方法() */public class KafkaProducerWrapper {    private Logger LOG = LoggerFactory.getLogger(KafkaProducerWrapper.class);    private KafkaProducer
producer = init(); private KafkaProducer
init() { Config conf = Configuration.conf.getConfig("kafka"); Properties props = new Properties(); props.put("bootstrap.servers", conf.getString("bootstrap.servers")); props.put("acks", conf.getString("acks")); props.put("retries", conf.getInt("retries")); props.put("batch.size", conf.getInt("batch.size")); props.put("linger.ms", conf.getInt("linger.ms")); props.put("buffer.memory", conf.getLong("buffer.memory")); props.put("key.serializer", conf.getString("key.serializer")); props.put("value.serializer", conf.getString("value.serializer")); LOG.info("KafkaProducer Properties: {0}", props.toString()); return new KafkaProducer<>(props); } public void send(String topic, String message, Callback callback) { producer.send(new ProducerRecord<>(topic, message), callback); } public void close() { producer.close(); LOG.info("Kafka Producer Closed"); } public static void main(String[] args) { //KafkaProducerWrapper sender=new KafkaProducerWrapper(); //sender.producer.partitionsFor("xxxxx").forEach(System.out::println); }}

application.conf

server {  port = 20000  counter.delay = 30s}kafka {  bootstrap.servers = "XXX"  acks = all  retries = 1  batch.size = 1048576  linger.ms = 1  buffer.memory = 33554432  key.serializer = "org.apache.kafka.common.serialization.StringSerializer"  value.serializer = "org.apache.kafka.common.serialization.StringSerializer"}mysql {  url = "jdbc:mysql://XXX/user_privileges"  user = "XXX"  password = "XXX"}

pom.xml

4.0.0
com.donews.data
kafkahttp
1.0-SNAPSHOT
com.typesafe
config
1.3.0
io.vertx
vertx-web
3.2.1
org.apache.kafka
kafka-clients
0.9.0.1
com.typesafe
config
1.3.0
mysql
mysql-connector-java
6.0.2
com.alibaba
fastjson
1.2.11
org.apache.httpcomponents
httpclient
4.3.3
org.apache.maven.plugins
maven-compiler-plugin
3.5.1
1.8
1.8

假设您喜欢我写的博文。读后认为收获非常大,最好还是小额赞助我一下,让我有动力继续写出高质量的博文。感谢您的观赏!

微信

这里写图片描写叙述

转载于:https://www.cnblogs.com/gavanwanggw/p/7354505.html

你可能感兴趣的文章
python 相对路径和绝对路径的区别
查看>>
Day36 python基础--并发编程基础5
查看>>
《Python从小白到大牛》第6章 数据类型
查看>>
三层架构的是与非
查看>>
lucene bug的报告经历
查看>>
火狐访问HTTPS网站显示连接不安全的解决方法
查看>>
防火墙(一)主机型防火墙
查看>>
基于哈夫曼编码的压缩算法的实现
查看>>
TCP长连接与短连接的区别
查看>>
sed tr
查看>>
FTP文件传输服务器(详解)
查看>>
ERROR OGG-01172 Discard file (/oradata/gglog/repl.dsc) exceeded max bytes (500000000).
查看>>
Activiti 实战篇 小试牛刀
查看>>
java中的Static class
查看>>
Xshell 连接CentOS服务器解密
查看>>
[工具类]视频音频格式转换
查看>>
GNS3与抓包工具Wireshark的关联
查看>>
设计模式之策略设计模式
查看>>
groovy-语句
查看>>
VIM寄存器使用
查看>>