暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

nginx优化【nginx + kafka】实现百万吞吐

考拉苑 2016-11-18
1330

一、整体架构采用nginx + kafka

整体基于openresty + lua-resty-kafka+centos

首先添加openresty依赖

  1. apt-get install libreadline-dev libncurses5-dev libpcre3-dev libssl-dev perl make build-essential  

  2. yum install readline-devel pcre-devel openssl-devel gcc

安装编译openresty:

  1. 安装文件所在目录: cd /opt/nginx/   

  2. 下载openresty: wget https://openresty.org/download/openresty-1.9.7.4.tar.gz  

  3. 解压:tar -xzf openresty-1.9.7.4.tar.gz /opt/nginx/  

  4. 配置:  

      # 指定目录为/opt/openresty,默认在/usr/local。  

./configure --prefix=/opt/openresty --with-luajit --without-http_redis2_module --with-http_iconv_module  

    5. 安装 make & make install

安装lua-resty-kafka

    #下载lua-resty-kafka:  

  1. wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip  

       unzip lua-resty-kafka-master.zip -d /opt/nginx/  

  1. #拷贝lua-resty-kafka到openresty  

      mkdir /opt/openresty/lualib/kafka  

      cp -rf /opt/nginx/lua-resty-kafka-master/lib/resty /opt/openresty/lualib/kafka/

安装单机kafka:【需要注意的如果不能保证nginx和kafka在同一台机器需要调整/opt/openresty/lualib/kafka里面的client.lua源码 具体见本文最后

     cd /opt/nginx/  

    wget http://apache.fayea.com/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz  

    tar xvf kafka_2.10-0.9.0.1.tgz  

    # 开启单机zookeeper  

    nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties > ./zk.log 2>&1 &  

    # 绑定broker ip,必须绑定  

    #在config/servier.properties下修改host.name  

    #host.name={your_server_ip}  

    # 启动kafka服务  

    nohup sh bin/kafka-server-start.sh config/server.properties > ./server.log 2>&1 &  

    # 创建测试topic【无论nginx和kafka是否在一台 必须保证topi存在 并且 nginx.conf使用名称相同】  

    sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 1 -- replication-factor 1 

二、配置【nginx.conf + /etc/sysctl.conf】

nginx.conf配置

#user  nobody;

worker_processes  auto;  #nginx总进程 CPU核数

worker_rlimit_nofile 100000; #一个nginx进程打开的最多文件描述符数目;(系统的值ulimit -n)与nginx进程数相除;建议一般是 ulimit -n的值保持一致

error_log  logs/error.log; #全局错误日志存放

#error_log  logs/error.log  notice;

#error_log  logs/error.log  info;

#pid        logs/nginx.pid;

events {

use epoll;

multi_accept on;

worker_connections  65535;

}

http {

gzip on;

gzip_disable "msie6";

gzip_proxied any;

gzip_min_length 1000;

gzip_buffers 16 64k;

gzip_comp_level 4;

gzip_types text/plain application/json application/x-javascript text/css application/xml;

gzip_vary on;

server_tokens  off;

sendfile        on;

tcp_nopush on;

tcp_nodelay on;

keepalive_timeout  30;

client_header_timeout 30;

client_body_timeout 30;

reset_timedout_connection on;

send_timeout 10;

limit_conn_zone $binary_remote_addr zone=addr:5m;

limit_conn addr 100;

open_file_cache max=200000 inactive=20s;

open_file_cache_valid 30s;

open_file_cache_min_uses 2;

open_file_cache_errors on;

access_log off;

lua_package_path "/opt/openresty/lualib/kafka/?.lua;;";   #引入lua-resty-kafka

server {

listen       80;

server_name  localhost;

location favicon.ico {

root   html;

index  index.html index.htm;

}

location logs {

# 使用log_by_lua 包含lua代码,因为log_by_lua指令运行在请求最后且不影响proxy_pass机制

log_by_lua '

-- 引入lua所有api

local cjson = require "cjson"

local producer = require "resty.kafka.producer"

-- 定义kafka broker地址,ip需要和kafka的host.name配置一致

local broker_list = {

{ host = "10.33.97.92", port = 9092 },

}

local broker_list_host_ip = {  #该块代码见client.lua修改

                   { host = "datagroup-97-92", ip = "10.33.97.92" },

               }  

-- 定义json便于日志数据整理收集

local log_json = {}

log_json["uri"]=ngx.var.uri

log_json["args"]=ngx.var.args

log_json["host"]=ngx.var.host

log_json["request_body"]=ngx.var.request_body

log_json["remote_addr"] = ngx.var.remote_addr

log_json["remote_user"] = ngx.var.remote_user

log_json["time_local"] = ngx.var.time_local

log_json["status"] = ngx.var.status

log_json["body_bytes_sent"] = ngx.var.body_bytes_sent

log_json["http_referer"] = ngx.var.http_referer

log_json["http_user_agent"] = ngx.var.http_user_agent

log_json["http_x_forwarded_for"] = ngx.var.http_x_forwarded_for

log_json["upstream_response_time"] = ngx.var.upstream_response_time

log_json["request_time"] = ngx.var.request_time

-- 转换json为字符串

local message = cjson.encode(log_json);

-- 定义kafka异步生产者

local bp = producer:new(broker_list, { producer_type = "async", broker_list_host_ip = broker_list_host_ip,refresh_interval = 3000 })

-- 发送日志消息,send第二个参数key,用于kafka路由控制:

-- key为nill(空)时,一段时间向同一partition写入数据

-- 指定key,按照key的hash写入到对应的partition

local ok, err = bp:send("test1", nil, message)

if not ok then

ngx.log(ngx.ERR, "kafka send err:", err)

return

end

';

}

error_page   500 502 503 504  /50x.html;

location = 50x.html {

root   html;

}

}

}

/etc/sysctl.conf优化【根据自身服务器情况调整】

# Kernel sysctl configuration file for Red Hat Linux

#

# For binary values, 0 is disabled, 1 is enabled.  See sysctl(8) and

# sysctl.conf(5) for more details.

# Controls IP packet forwarding

net.ipv4.ip_forward = 0

# Controls source route verification

net.ipv4.conf.default.rp_filter = 1

# Do not accept source routing

net.ipv4.conf.default.accept_source_route = 0

# Controls the System Request debugging functionality of the kernel

kernel.sysrq = 0

# Controls whether core dumps will append the PID to the core filename.

# Useful for debugging multi-threaded applications.

kernel.core_uses_pid = 1

# Controls the use of TCP syncookies

net.ipv4.tcp_syncookies = 1

# Disable netfilter on bridges.

#net.bridge.bridge-nf-call-ip6tables = 0

#net.bridge.bridge-nf-call-iptables = 0

#net.bridge.bridge-nf-call-arptables = 0

# Controls the default maxmimum size of a mesage queue

kernel.msgmnb = 65536

# Controls the maximum size of a message, in bytes

kernel.msgmax = 65536

# Controls the maximum shared segment size, in bytes

kernel.shmmax = 68719476736

# Controls the maximum number of shared memory segments, in pages

kernel.shmall = 4294967296

# add 2016-11-16

# 调高系统的 IP 以及端口数据限制,从可以接受更多的连接

net.ipv4.ip_local_port_range = 2000 65000

net.ipv4.tcp_window_scaling = 1

# number of packets to keep in backlog before the kernel starts dropping them

# 设置协议栈可以缓存的报文数阀值,超过阀值的报文将被内核丢弃

net.ipv4.tcp_max_syn_backlog = 3240000

# increase socket listen backlog

# 调高 socket 侦听数阀值

net.core.somaxconn = 3240000

net.ipv4.tcp_max_tw_buckets = 1440000

# Increase TCP buffer sizes

# 调大 TCP 存储大小

net.core.rmem_default = 8388608

net.core.rmem_max = 16777216

net.core.wmem_max = 16777216

net.ipv4.tcp_rmem = 4096 87380 16777216

net.ipv4.tcp_wmem = 4096 65536 16777216

net.ipv4.tcp_congestion_control = cubic


/opt/openresty/lualib/kafka/resty/kafka的client.lua 调整内容红色部分

  function _M.new(self, broker_list, client_config)

   local opts = client_config or {}

   local socket_config = {

       socket_timeout = opts.socket_timeout or 3000,

       keepalive_timeout = opts.keepalive_timeout or 600 * 1000,   -- 10 min

       keepalive_size = opts.keepalive_size or 2,

   }

   

  --start 0 == 2016-11-18

  local broker_list_host_ip = opts.broker_list_host_ip or {}

   local cli = setmetatable({

       broker_list = broker_list,

       broker_list_host_ip = broker_list_host_ip,

       topic_partitions = {},

       brokers = {},

       client_id = "worker" .. pid(),

       socket_config = socket_config,

   }, mt)

   --end 0 == 2016-11-18

   if opts.refresh_interval then

       meta_refresh(nil, cli, opts.refresh_interval 1000) -- in ms

   end

   return cli

end

function _M.fetch_metadata(self, topic)

   local brokers, partitions = _metadata_cache(self, topic)

   if brokers then

       return brokers, partitions

   end

   _fetch_metadata(self, topic)

   return _metadata_cache(self, topic)

end

function _M.choose_broker(self, topic, partition_id)

   local brokers, partitions = self:fetch_metadata(topic)

   if not brokers then

       return nil, partitions

   end

   local partition = partitions[partition_id]

   if not partition then

       return nil, "not found partition"

   end

   local config = brokers[partition.leader]

   if not config then

       return nil, "not found broker"

   end

   

   --start 1 == 2016-11-18

  local broker_list_host_ip = self.broker_list_host_ip

   for k = 1, #broker_list_host_ip do

        local hostip = broker_list_host_ip[k]

        if config ~= nil and hostip ~= nil and config.host == hostip.host then

            config.host = broker_list_host_ip[k].ip

        end

    end

   --end  1 == 2016-11-18

 

   return config

end

return _M

三、启动nginx

 # 启动  

./nginx -c /opt/openresty/nginx/conf/nginx.conf  

# 重启  

./nginx -s reload  

四、测试

  http://localhost/logs?passport=83FBC7337D681E679FFBA1B913E22A0D@qq.sohu.com&page=2&size=10

开启kafka consumer消费数据

kafka-console-consumer.sh --zookeeper 10.33.97.52:2181 --topic test1 --from-beginning

截图如下


压测结果【使用jmeter】


apache ab压测



文章转载自考拉苑,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论