首页
关于
留言
接口
搜索
首页
登录
登录
搜索
KAKA 梦很美
累计撰写
47
篇文章
累计收到
0
条评论
首页
栏目
首页
登录
页面
首页
关于
留言
接口
服务架构
置顶
WebSocket 分布式架构解决方案
前导 近期遇到一件很奇怪的事情,时不时收到同事反馈说 部分用户无法接收到聊天室(WebSocket 服务)消息,然而在测试服以各种方式测试都无法复现这种现象。于是陷入沉思,因为这个问题必须解决,用户必须要退出聊天室再重新进去才能看到这些丢失的消息,已经影响到业务间客服与用户的正常沟通。 这到底是什么原因呢?而且没法在测服复现。 有人说,当脑袋混乱的时候要休息一会,于是我决定先放下这些思考去玩别的东西。在晚上睡觉之余,突然的灵感让我感觉要破案了!服务采用的是 PHP Swoole , 用户与客户端FD 的关系绑定是通过 Swoole Table (服务进程间内存共享) 实现, 我在各个环节确认了关系绑定都没问题情况下还出现 客户端FD 丢失,可能就是因为服务器被负载均衡 (SLB)了,因为测试服是单机。 第二天一早, 为了验证自己的猜测,我查看了在阿里云上的负载均衡服务配置,果然破案了!!!这个项目此前一直是单机服务,也不知道从何时开始 变成多节点服务了。 我来描述下为什么分布式服务的 WebSocket 会存在这种现象,而 分布式服务的 HTTP 却没有这样的问题呢?因为 WebSocket 有个用户与客户端标识(FD)关系需要绑定,而 HTTP 服务一般是不需要关注客户端标识(FD)的。WebSocket 服务端需要推送消息到用户所连接的客户端时,例如A、B两台服务器,用户1连接到聊天室(服务器A),客服1也连接到聊天室(服务器B), 这种情况下 显然用户1发消息给客服1 是对牛弹琴了,因为用户1发送消息后,服务器A会遍历该服务器内的所有用户与客户端标识(FD),然后取出所有客服1的FD 进行消息推送,而客服1连接的是服务器B,则对于用户1来说 客服1是不在线的, 所以用户1推送消息是推了个寂寞啊!!! 再如 你的服务是支持用户多设备、多平台同时在线也是一样的道理,这种情况下也就意味着可能用户的客户端标识(FD)会同时分布在 服务器A、服务器B、服务器C ...,那么用户在其中一台设备发送消息,在其他端登陆的该用户都应该要收到这条消息,单纯的根据用户所连接的服务去发送消息 那么其他端在线的该用户都无法收到此消息了,群发也是一样的道理。 多节点问题 在开始思考分布式会有什么问题时,先来回答一个问题: 服务端如何与客户端交流? 在 WebSocket 服务端,每当与客户端连接成功后,会生成一个 唯一的客户端标识符 FD,WebSocket 会维护一个与客户端所有连接的 Connections。在业务层,你需要将每个连接进来的客户端标识(FD)与项目的用户ID绑定起来,比如用 Redis 将用户和客户端标识(FD) 保存起来,当客户端断开连接时解绑(删除掉对应的客户端标识(FD)),因为我的是用的PHP Swoole,所以我用的是 Swoole Table (服务进程间内存共享) 实现用户与客户端标识(FD)绑定关系。这样你就可以知道某个用户在不在线,并且这个用户的客户端标识(FD)有哪些,然后遍历 Swoole Table 把用户的所有客户端标识(FD)取出来循环推送消息给客户端。 那如何给所有人广播消息呢? 服务器只需要与它自身的所有客户端连接 Server.Connections 挨个发消息就是广播,所以它只是一个伪广播: 我要给群里所有人发消息,但我不能在群里发,只能挨个私发。 单节点 当单节点时,流程如下: 这时所有用户都能收到消息通知。 多节点 当多节点时,就会有部分用户无法正常收到通知 (就是我文中开头所描述的现象),从以下流程图中可以很清楚地看到问题所在: 负载到节点B 的所有用户都没有收到消息通知。 如何解决 说了这么多,怎么解决这个问题呢? 网上的很多教程,有些是通过 WebSocket 中间服务转发器、网关转发器 等实现方案,但这些实现方式有局限性,因为这些方案大部分是需要判断用户在哪台服务器上(需要知道IP),然后转发层将请求转发到用户所在服务器上。这种方案用户单端登录还好,如果用户多端登录 请求被转发到多服务器上同时处理相关逻辑显然是有问题的,比如新增数据、修改数据...这些操作等,这种架构解决方案 用户多点平台登录时调整复杂度会变得较高。 将 Swoole Table (服务进程间内存共享) 改造为 Redis 哈希 来实现用户与客户端标识(FD)绑定关系,主要目的是在单节点处理逻辑的时候经常需要判断对端用户是否在线,单服务内的共享内存并不能知道其他服务内该用户是否在线,所以这个方案不可取了。改用 分布式缓存 就可以判断出对端用户是否在线了。 分布式缓存实现用户与客户端标识(FD)绑定关系大致做法为: 在服务启动时创建一个 全局唯一ID,保证多服务下这个 ID的唯一性,比如启动5个服务时,每个服务的ID都不能有相同,目的是用来分布式缓存的客户端FD标识所在的服务ID,当然 你也可以使用IP作为唯一性(可能会更直观点)。 将 唯一ID_FD 作为哈希键存储,在某个事件或定时清除不活跃的哈希键。要当前某个服务的所有哈希键的时候可以使用 hScan 循环迭代模糊匹配实现,必要时使用 hGetAll 获取所有哈希键值(并发高服务 在此提醒谨慎使用哈)。 多节点服务器就会有分布式问题,解决分布式问题就找一个大家都能找到的地,比如说 MQTT、Kafka、RabbitMQ 等消息中间件,另外使用 Redis 的发布\订阅(pub\sub)功能 也一样可以实现,不过在此我选择的是用 RabbitMQ 来实现。 改进后流程图如下: 负载均衡(SLB) 内所有服务启动时都绑定同一个RabbitMQ Fanout(广播模式) 交换机, 如果该交换机不存在则创建。然后每个服务都生成一个唯一的该交换机队列(生成的交换机队列不能相同, 比如可以服务器1生成的队列名为 S1, 服务器2生成的队列名为 S2), 可以将生成的队列设置为 auto_delete: true, 这样就可以达到当 队列没有消费者的时候该队列会自动删除, 服务重启时又重新生成的效果。接下来就是每个服务都注册该交换机队列的监听消费,当队列的每一条息出栈时都会广播到该交换机下的所有队列(即所有服务的队列监听事件都能收到PUSH进来的消息)。 客户端请求到 负载均衡(SLB) 任意一台服务器 该服务器逻辑处理完后将要发送给客户端的消息推送至 RabbitMQ 消息队列 消息队列将该消息广播到所有服务器的监听消费事件内 所有服务器的监听消费事件内 Redis hScan 迭代遍历当前服务内所有客户端连接,取出所有符合用户ID对应的客户端标识(FD)进行推送消息。(并发高时对 Redis 冲击很大,需要预估支撑力,对缓存哈希的读要求随并发高低而上升 O(n)) 这种 WebSocket 分布式架构解决方案同时 实现了支持单个用户多设备、多平台同时在线的场景,不需要知道有多少台服务器(也就是说服务器可以无限动态扩容),不需要知道用户对应哪些服务器,也不需要知道各个服务器的IP地址,只需要处理各自服务器内的监听消费队列即可。相对于一些通过搭建转发服务器、网关服务器等实现的 WebSocket 分布式架构 有着天然的优势,这些架构解决方案要复杂很多,特别是要实现多设备、多平台同时在线的场景时 更加、更加、更加复杂。
2023年-8月-13日
467 阅读
0 评论
服务架构
置顶
项目内存增大解决方案之日志优化
本文主要实践在 PHP (Swoole 环境) 和 Golang 项目中的 K8S 生产环境解决方案。 内存现象 在部署到 K8S 生产环境 后用 grafana 监控工具发现项目内存很高, 明明业务服务量不算大, 却占用了大量内存。于是排查容器内存情况发现, Buffer Cache 无法释放, 一直在增长。查看代码层也并没有静态变量等现象, 于是我想到每次请求进来都会打印日志, 会不会是不间断地写文件导致文件没法关闭, 内存一直在缓冲区越来越大。为了验证我的猜测, 我尝试把请求日志暂时性关闭, 观察了一天时间发现我的猜测是正确的!内存对比也相对明显(频繁写日志会也明显), 并且容器的总内存也通过自动伸缩机制 降低了2-3个倍数 (因为内存占比是根据总内存大小计算的, 所以内存占比仅仅是反映当前总内存的占用率)!!! 下面是关闭请求日志前后的内存对比: 解决方案 知道了问题就好办了, 解决方案就是: 将日志直接输出到控制台, 运维采集控制台日志到 ELK 。 下面是最终的内存对比: 还在担心服务内存太高 连业务代码都要写的非常小心么? 还在担心写多几行复杂业务 系统炸掉么? 还在担心不敢多打印几个日志? 从此不需要再有此顾虑了!!!
2022年-12月-16日
151 阅读
0 评论
服务架构
2022-8-11
简单实现Go+PHP gRPC通信
背景 工作中难免会遇到单语言无法解决的问题(谨慎点描述就是 单语言实现起来比较麻烦、吃力、复杂), 导致产品需求的实现没这么完美。这时候,我们可以给它添上一双翅膀,大家调侃最多的应该就是Golang赋予PHP能量了吧。 题外话, 其实我认为: 语言之争没有意义,语言只是工具, 它只是为了帮助我们更好地解决问题。当单语言无法满足需求的时候,可以根据业务和成本来决定是否利用其他语言来实现。 多一门技术就多一条门路,只局限于一种语言,您可能就无法前行。 举个例子: 公司3个业务部门都使用PHP语言, 另外一个中台服务部门使用Java, Golang, Python。此时公司想要打通这3个业务部门的用户体系,将各业务部门的用户关联起来形成互通。这个串联工作肯定需要中台服务部门来开发,一切的规范由中台定义。由于跨语言,在中台招聘PHP开发也不太现实,业务部门抽离人员去支持中台也不可能(那要中台干嘛?)。这个时候有两个选择: HTTP 协议传输 gRPC 远程调用 如果您的业务调用服务频率较高,采用 HTTP 可能不太行,这个时候我们需要 gRPC 通信。 至于什么是gRPC, 可以 点击查阅官方文档 说了这么多, 如果您了解 gRPC, 上面的内容可以忽略, 当然 您可能已经读完了, 嘿嘿。废话说完, 我们开始实现 Golang+PHP 吧, 这里只是实现简单案例: 通过 Go 获取系统Cpu、内存、磁盘信息。 实现 如果您本地没有安装 protoc, 请先安装。 我使用的是 Mac 环境 brew install protobuf 验证是否安装成功 protoc --version 如果提示没有 protoc 命令, 你可能需要将 protoc 加入到环境变量(查找 protoc 执行文件 find / -name protoc), 加入环境变量后执行 source ~/.bash_profile 如果您的 PHP 没有安装gRPC 扩展, 请先安装。 pecl install grpc 我们先编写 Golang 程序, 在这里我默认您会安装 Go, 如果不会安装并且您是 Mac 操作系统 可以参考Mac-brew-安装-Golang 新建项目 mkdir go-grpc 确保环境变量已开启 Module, 可项目根目录执行 export GO111MODULE=on 创建 go.mod (包管理,类似 composer) go mod init go-grpc 安装 grpc 和 protobuf 包 go get google.golang.org/grpc go get google.golang.org/protobuf/reflect/protoreflect@v1.25.0 创建 proto 文件 创建 system/system.proto 文件, 写入内容 syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本 package system; // 表示生成的go文件的存放地址,会自动生成目录。 option go_package = "./system"; // 定义服务 service System { // 获取系统信息接口 rpc GetSystemInfo (GetSystemInfoRequest) returns (GetSystemInfoResponse) {} } // 获取系统信息接口请求参数 message GetSystemInfoRequest {} // 获取系统信息接口返回值 message GetSystemInfoResponse { double cpuPercent = 1; // CPU使用率 double memPercent = 2; // 内存使用率 double diskPercent = 3; // 磁盘使用率 string cpuGHz = 4; // CPU主频 int32 cpuCounts = 5; // CPU核数 string memTotal = 6; // 总内存 string memUsed = 7; // 剩余内存 string diskTotal = 8; // 磁盘总大小 string diskUsed = 9; // 磁盘剩余大小 } 编写 shell 文件 创建system/system_rpc.sh 文件, 写入内容 #! /bin/sh # 系统服务 - 该文件目录下执行生成GO RPC文件命令 protoc -I $(pwd)/ $(pwd)/system.proto --go_out=plugins=grpc:./rpc 给文件赋予可执行权限 chmod -R 755 system/system_rpc.sh 生成 GO 代码 进入 cd system 目录 新建 rpc 目录 mkdir rpc 执行 ./system_rpc.sh 此时, 在 system/rpc 目录已生成 system/system.pb.go 文件 安装 gopsutil 包获取系统信息 go get github.com/shirou/gopsutil go get github.com/tklauser/go-sysconf 编写获取系统信息代码 新建 internal/utils/system.go 文件, 写入内容 package utils import ( "fmt" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/mem" "time" ) // 获取 cpu 使用率 func GetCpuPercent() float64 { percent, _ := cpu.Percent(time.Second, false) return percent[0] } // 获取 内存 使用率 func GetMemPercent() float64 { memInfo, _ := mem.VirtualMemory() return memInfo.UsedPercent } // 获取 磁盘 使用率 func GetDiskPercent() float64 { diskInfo, _ := disk.Usage("/") return diskInfo.UsedPercent } // 字节的单位转换 保留两位小数 func FormatFileSize(fileSize int64) (size string) { if fileSize < 1024 { // return strconv.FormatInt(fileSize, 10) + "B" return fmt.Sprintf("%.2fB", float64(fileSize)/float64(1)) } else if fileSize < (1024 * 1024) { return fmt.Sprintf("%.2fKB", float64(fileSize)/float64(1024)) } else if fileSize < (1024 * 1024 * 1024) { return fmt.Sprintf("%.2fMB", float64(fileSize)/float64(1024*1024)) } else if fileSize < (1024 * 1024 * 1024 * 1024) { return fmt.Sprintf("%.2fGB", float64(fileSize)/float64(1024*1024*1024)) } else if fileSize < (1024 * 1024 * 1024 * 1024 * 1024) { return fmt.Sprintf("%.2fTB", float64(fileSize)/float64(1024*1024*1024*1024)) } else { // if fileSize < (1024 * 1024 * 1024 * 1024 * 1024 * 1024) return fmt.Sprintf("%.2fEB", float64(fileSize)/float64(1024*1024*1024*1024*1024)) } } 新建 rpc/logic/systeminfo.go 文件, 写入内容 package logic import ( "context" "fmt" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/mem" "go-grpc/system/internal/utils" "go-grpc/system/rpc/system" ) type SystemInfoLogic struct { ctx context.Context } func NewSystemInfoLogic(ctx context.Context) *SystemInfoLogic { return &SystemInfoLogic{ ctx: ctx, } } // 获取系统信息 func (l *SystemInfoLogic) GetSystemInfo(request *system.GetSystemInfoRequest) (*system.GetSystemInfoResponse, error) { var ( cpuGHz = "0GHz" ) c, _ := cpu.Info() for _, v := range c { if v.Mhz > 0 { cpuGHz = fmt.Sprintf("%v0GHz", v.Mhz/1000) } } cpuCounts, _ := cpu.Counts(true) m, _ := mem.VirtualMemory() w, _ := disk.Usage("/") return &system.GetSystemInfoResponse{ CpuPercent: utils.GetCpuPercent(), MemPercent: utils.GetMemPercent(), DiskPercent: utils.GetDiskPercent(), CpuGHz: cpuGHz, CpuCounts: int32(cpuCounts), MemTotal: utils.FormatFileSize(int64(m.Total)), MemUsed: utils.FormatFileSize(int64(m.Used)), DiskTotal: utils.FormatFileSize(int64(w.Total)), DiskUsed: utils.FormatFileSize(int64(w.Total) - int64(w.Free)), }, nil } 新建服务端文件 进入 cd rpc 目录 新建 server 目录 mkdir server 进入 server 目录, 新建 system.go 文件, 写入内容 package server import ( "context" "go-grpc/system/rpc/logic" "go-grpc/system/rpc/system" ) // 系统信息服务 type System struct {} // 获取系统信息 func (server *System) GetSystemInfo(ctx context.Context, request *system.GetSystemInfoRequest) (*system.GetSystemInfoResponse, error) { l := logic.NewSystemInfoLogic(ctx) return l.GetSystemInfo(request) } 编写服务启动文件 system 目录下新建 server.go 文件, 写入内容 package main import ( "fmt" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "net" "go-grpc/system/rpc/server" "go-grpc/system/rpc/system" ) func main() { // 监听本地的 10000 端口 lis, err := net.Listen("tcp", ":10000") if err != nil { fmt.Printf("failed to listen: %v", err) return } s := grpc.NewServer() // 创建 GRPC 服务器 system.RegisterSystemServer(s, &server.System{}) // 在 GRPC 服务端注册服务 reflection.Register(s) // 在给定的 GRPC 服务器上注册服务器反射服务 // Serve 方法在 lis 上接受传入连接,为每个连接创建一个 ServerTransport 和 server 的 goroutine。 // 该 goroutine 读取 GRPC 请求,然后调用已注册的处理程序来响应它们。 err = s.Serve(lis) if err != nil { fmt.Printf("failed to serve: %v", err) return } } 项目目录结构 . ├── go.mod ├── go.sum └── system ├── internal │ └── utils │ └── system.go ├── rpc │ ├── logic │ │ └── systeminfo.go │ ├── server │ │ └── system.go │ └── system │ └── system.pb.go ├── server.go ├── system.proto └── system_rpc.sh 启动服务 go run server.go 此时, Golang 服务已经编写完成!!! 接下来, 我们编写 PHP 代码, 在这里我默认您用了composer 包管理。 导入 grpc 和 protobuf 包 composer require grpc/grpc composer require google/protobuf 新建 gRPC 目录 为了方便演示, 我们在项目根目录下新建 mkdir -p grpc/system 文件夹, 然后进入到该目录cd grpc/system 创建 touch system.proto 文件, 写入内容 syntax = "proto3"; // 版本声明,使用Protocol Buffers v3版本 package System; // 系统服务 service System { // 获取系统信息接口 rpc GetSystemInfo (GetSystemInfoRequest) returns (GetSystemInfoResponse) {} } // 获取系统信息接口请求参数 message GetSystemInfoRequest {} // 获取系统信息接口返回值 message GetSystemInfoResponse { double cpuPercent = 1; // CPU使用率 double memPercent = 2; // 内存使用率 double diskPercent = 3; // 磁盘使用率 string cpuGHz = 4; // CPU主频 int32 cpuCounts = 5; // CPU核数 string memTotal = 6; // 总内存 string memUsed = 7; // 剩余内存 string diskTotal = 8; // 磁盘总大小 string diskUsed = 9; // 磁盘剩余大小 } 编写 shell 文件 创建 system.sh 文件, 写入内容 #! /bin/sh # 系统服务 - 该目录文件下执行生成 PHP 文件命令 protoc -I $(pwd)/ $(pwd)/system.proto --php_out=../ 给文件赋予可执行权限chmod -R 755 system.sh 添加自动加载命名空间 在 composer.json 文件的 autoload 配置增加如下 "autoload":{ "psr-4":{ "GPBMetadata\\":"grpc/GPBMetadata/", "System\\":"grpc/system/" } }, 然后执行 composer dump-autoload 生成 PHP 代码 在 grpc/system目录下执行 ./system.sh 生成 PHP 代码文件, 此时我们 Tree 看看目录结构 . ├── composer.json ├── composer.lock ├── vendor └── grpc ├── GPBMetadata │ └── System.php ├── system │ ├── GetSystemInfoRequest.php │ ├── GetSystemInfoRequest.php │ └── system.proto │ └── system.sh 编写客户端文件 我们进入到 grpc/system 目录新建客户端文件 SystemClient.php 调用系统服务接口, 写入内容 <?php namespace System; class SystemClient extends \Grpc\BaseStub { public function __construct($hostname, $opts, $channel = null) { parent::__construct($hostname, $opts, $channel); } public function getSystemInfo(\System\GetSystemInfoRequest $argument, $metadata = [], $options = []) { return $this->_simpleRequest( '/system.System/GetSystemInfo', $argument, [\System\GetSystemInfoResponse::class, 'decode'], $metadata, $options ); } } 编写调用服务文件 在根目录新建 index.php 文件, 写入内容 <?php require 'vendor/autoload.php'; // 创建客户端实例 $client = new \System\SystemClient('127.0.0.1:10000', [ 'credentials' => \Grpc\ChannelCredentials::createInsecure() ]); $request = new \System\GetSystemInfoRequest(); $reponse = $client->getSystemInfo($request)->wait(); list($reply, $status) = $reponse; $data['system'] = [ 'cpuPercent' => '0%', 'memPercent' => '0%', 'diskPercent' => '0%', 'cpuGHz' => '0GHz', 'cpuCounts' => 0, 'memTotal' => '0GB', 'memUsed' => '0GB', 'diskTotal' => '0GB', 'diskUsed' => '0GB', ]; if ($status->code === 0) { $data['system']['cpuPercent'] = floatval(sprintf("%.2f", $reply->getCpuPercent())) . '%'; $data['system']['memPercent'] = floatval(sprintf("%.2f", $reply->getMemPercent())) . '%'; $data['system']['diskPercent'] = floatval(sprintf("%.2f", $reply->getDiskPercent())) . '%'; $data['system']['cpuGHz'] = $reply->getCpuGHz(); $data['system']['cpuCounts'] = $reply->getCpuCounts(); $data['system']['memTotal'] = $reply->getMemTotal(); $data['system']['memUsed'] = $reply->getMemUsed(); $data['system']['diskTotal'] = $reply->getDiskTotal(); $data['system']['diskUsed'] = $reply->getDiskUsed(); } var_dump($data); 运行调用 根目录执行 php index.php 输出内容如下: array(1) { ["system"]=> array(9) { ["cpuPercent"]=> string(6) "10.97%" ["memPercent"]=> string(6) "69.17%" ["diskPercent"]=> string(6) "26.89%" ["cpuGHz"]=> string(7) "2.80GHz" ["cpuCounts"]=> int(8) ["memTotal"]=> string(7) "16.00GB" ["memUsed"]=> string(7) "11.13GB" ["diskTotal"]=> string(8) "233.47GB" ["diskUsed"]=> string(8) "175.94GB" } } 到这里整个功能完成, 贴上 PHP 端最终目录结构 . ├── index.php ├── composer.json ├── composer.lock ├── vendor └── grpc ├── GPBMetadata │ └── System.php ├── system │ ├── GetSystemInfoRequest.php │ ├── GetSystemInfoRequest.php │ └── system.proto │ └── system.sh │ └── SystemClient.php 以上使用的是原生态 PHP, 在此推荐使用 Hyperf 实现 gRPC 客户端。
2022年-8月-11日
200 阅读
0 评论
服务架构
2022-7-21
MySQL 双主一致性架构优化
一、双主保证高可用 MySQL 数据库集群常使用 一主多从、主从同步、读写分离 的方式来扩充数据库的读性能,保证读库的高可用,但此时写库仍然是单点。 在一个 MySQL 数据库集群中可以设置两个主库,并设置双向同步,以冗余写库的方式来保证写库的高可用。 二、并发引发不一致 数据冗余会引发数据的一致性问题,因为数据的同步有一个时间差,并发的写入可能导致数据同步失败,引起数据丢失: 如上图所述,假设主库使用了 auto increment 来作为自增主键: 两个 MySQL-master 设置双向同步可以用来保证主库的高可用 数据库中现存的记录主键是1,2,3 主库1插入了一条记录,主键为4,并向主库2同步数据 数据同步成功之前,主库2也插入了一条记录,由于数据还没有同步成功,插入记录生成的主键也为4,并向主库1也同步数据 主库1和主库2都插入了主键为4的记录,双主同步失败,数据不一致 三、相同步长免冲突 能否保证两个主库生成的主键一定不冲突呢? 回答: 设置不同的初始值 设置相同的增长步长 就能够做到。 如上图所示: 两个 MySQL-master 设置双向同步可以用来保证主库的高可用 库1的自增初始值是1,库2的自增初始值是2,增长步长都为2 库1中插入数据主键为1/3/5/7,库2中插入数据主键为2/4/6/8,不冲突 数据双向同步后,两个主库会包含全部数据 如上图所示,两个主库最终都将包含1/2/3/4/5/6/7/8所有数据,即使有一个主库挂了,另一个主库也能够保证写库的高可用。 四、上游生成ID避冲突 换一个思路,为何要依赖于数据库的自增ID,来保证数据的一致性呢? 完全可以由 业务上游,使用统一的ID生成器,来保证ID的生成不冲突 如上图所示,调用方插入数据时,带入全局唯一ID,而不依赖于数据库的 auto increment,也能解决这个问题。 至于如何生成全局唯一,趋势递增的ID,可以搜索 分布式ID生成算法 相关内容查看。 五、消除双写不治本 使用 auto increment 两个主库并发写可能导致数据不一致,只使用一个主库提供服务,另一个主库作为 shadow-master,只用来保证高可用,能否避免一致性问题呢? 如上图所示: 两个 MySQL-master 设置双向同步可以用来保证主库的高可用 只有主库1对外提供写入服务 两个主库设置相同的虚IP,在主库1挂掉或者网络异常的时候,虚IP自动漂移,shadow master 顶上,保证主库的高可用 这个切换由于虚IP没有变化,所以切换过程对调用方是透明的,但在极限的情况下,也可能引发数据的不一致: 如上图所示: 两个 MySQL-master 设置双向同步可以用来保证主库的高可用,并设置了相同的虚IP 网络抖动前,主库1对上游提供写入服务,插入了一条记录,主键为4,并向 shadow master 主库2同步数据 突然主库1网络异常,keepalived 检测出异常后,实施虚IP漂移,主库2开始提供服务 在主键4的数据同步成功之前,主库2插入了一条记录,也生成了主键为4的记录,结果导致数据不一致 六、内网DNS探测 虚IP漂移,双主同步延时导致的数据不一致,本质上,需要在双主同步完数据之后,再实施虚IP偏移,使用内网DNS探测,可以实现 shadow master 延时高可用: 使用内网域名连接数据库,例如:db.58daojia.org 主库1和主库2设置双主同步,不使用相同虚IP,而是分别使用ip1和ip2 一开始db.58daojia.org指向ip1 用一个小脚本轮询探测ip1主库的连通性 当ip1主库发生异常时,小脚本delay一个x秒的延时,等待主库2同步完数据之后,再将db.58daojia.org解析到ip2 程序以内网域名进行重连,即可自动连接到ip2主库,并保证了数据的一致性 七、总结 主库高可用,主库一致性,一些小技巧: 双主同步是一种常见的保证写库高可用的方式 设置相同步长,不同初始值,可以避免 auto increment 生成冲突主键 不依赖数据库,业务调用方自己生成全局唯一ID是一个好方法 shadow master 保证写库高可用,只有一个写库提供服务,并不能完全保证一致性 内网DNS探测,可以实现在主库1出现问题后,延时一个时间,再进行主库切换,以保证数据一致性
2022年-7月-21日
156 阅读
0 评论
服务架构
2022-7-17
Redis 乐观锁解决高并发秒杀活动超卖问题
首先, 我们简单理解下乐观锁和悲观锁的概念。 悲观锁 顾名思义, 很悲观; 认为谁都可能对数据进行修改, 所以每次修改数据时都需要进行数据上锁。 乐观锁 顾名思义, 很乐观; 认为谁都可以对数据进行修改, 所以每次修改数据时都不会对数据进行上锁。但是数据修改提交时, 数据库会根据版本记录机制 在同一时间只能修改成功一个。 理解了这两个基础原理后, 其实我们就可以大概清楚 乐观锁和悲观锁其实都可以实现秒杀, 解决商品超卖的问题。但是悲观锁每次修改数据时都会对数据进行上锁, 比如setnx ; 而乐观锁 只需要判断数据版本是否发生变更, 如果没变更就修改成功, 反之就失败。 从性能上来讲, 显然乐观锁 更好。 我认为 Redis 乐观锁, 其实就是 WATCH 监视 和 TRANSACTION 事务 的结合体。 接下来, 我就来具体说说 Redis 乐观锁实现高并发下的秒杀活动 乐观锁 大多数是基于数据版本(VERSION)的记录机制实现的。即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表增加一个 "version" 字段来实现读取出数据时,将此版本号一同读出。之后更新时,对此版本号加1。此时,将提交数据的版本号与数据库表对应记录的当前版本号进行比对,如果提交的数据版本号大于数据库当前版本号,则予以更新,否则认为是过期数据。Redis 中可以使用watch命令会监视给定的 key,当exec时候如果监视的 key 从调用 watch 后发生过变化,则整个事务会失败。也可以调用watch多次监视多个 key。这样就可以对指定的 key 加乐观锁了。注意watch的 key 是对整个连接有效的,事务也一样。如果连接断开,监视和事务都会被自动清除。当然了exec,discard,unwatch命令都会清除连接中的所有监视。 事务 Redis 中的事务(Transaction)是一组命令的集合。事务同命令一样都是 Redis 最小的执行单位,一个事务中的命令要么都执行,要么都不执行。Redis 事务的实现需要用到 MULTI 和 EXEC 两个命令,事务开始的时候先向 Redis 服务器发送 MULTI 命令,然后依次发送需要在本次事务中处理的命令,最后再发送 EXEC 命令表示事务命令结束。Redis 的事务是下面4个命令来实现的。 multi 开启 Redis 的事务,置客户端为事务态。 exec 提交事务,执行从multi到此命令前的命令队列,置客户端为非事务态。 discard 取消事务,置客户端为非事务态。 watch 监视键值对,作用是 如果事务提交exec时发现监视的监视对发生变化,事务将被取消。 最后, 我用 Golang 代码简单实现基于乐观锁的秒杀, 其他语言也是一样的原理。 package main import ( "context" "fmt" redisv8 "github.com/go-redis/redis/v8" "github.com/raylin666/go-cache/redis" "strconv" "time" ) /** 模拟场景: 商品总数量为30个, 单次抢购的用户数量为50人, 每个人只能抢到1个商品。 **/ var ( // 商品总数量 total_goods = 30 // 已被抢购的商品数量缓存 Key goods_key = "goods_numbers" // 已抢到商品的用户缓存 Key user_exists_key = "user_success" ) func redisConnect() *redis.Client { var opts = new(redis.Options) opts.Network = "tcp" opts.Addr = "127.0.0.1:6379" opts.Password = "123456" opts.DB = 0 client, err := redis.New(context.TODO(), opts) if err != nil { panic(err) } return client } // 抢购逻辑处理 func watchRushToBuy(client *redis.Client, userId int) error { return client.Watch(func(tx *redisv8.Tx) error { // 用户已经抢到商品, 不能重复抢购 if tx.HExists(context.TODO(), user_exists_key, strconv.Itoa(userId)).Val() { fmt.Println(fmt.Sprintf("%d 用户已经抢到商品, 不能重复抢购", userId)) return nil } vint, getErr := tx.Get(context.TODO(), goods_key).Int() if getErr != nil && getErr != redisv8.Nil { return nil } // 不能超过商品总数量 if vint >= total_goods { fmt.Println(fmt.Sprintf("%d 用户您好, 记得下次早点来哦, 商品被抢完啦!", userId)) return nil } // 抢购事务处理 _, txErr := tx.TxPipelined(context.TODO(), func(pipeliner redisv8.Pipeliner) error { txCmd := pipeliner.Incr(context.TODO(), goods_key) if txCmd.Err() != nil { return txCmd.Err() } hsetCmd := pipeliner.HSet(context.TODO(), user_exists_key, userId, 1) // 设置用户领取成功状态失败, 回退商品数量 if hsetCmd.Err() != nil { pipeliner.Decr(context.TODO(), goods_key) return hsetCmd.Err() } return nil }) if txErr != nil { return txErr } // 抢购成功, 处理抢购后的逻辑流程 ... fmt.Println(fmt.Sprintf("恭喜 ID 为 %d 的用户抢到啦", userId)) return nil }, goods_key) } func main() { // 连接 Redis client := redisConnect() tryFunc := func(userId int) { // 未抢购成功的用户可重试抢购 for j := 0; j < 3; j++ { // Redis 监听 Key 变化并开启事务处理 watchErr := watchRushToBuy(client, userId) // 重试抢购 if watchErr != nil { fmt.Println(fmt.Sprintf("%d 用户重试抢购失败 - %v", userId, watchErr)) continue } return } } // 模拟用户的并发请求 (不论执行多少次, 或者并发数量加大, 都能正常抢购且不会超卖商品) for i := 0; i < 50; i++ { go func(i int) { // 抢购逻辑 tryFunc(i) }(i) } time.Sleep(1 * time.Second) value, _ := client.Conn.Get(context.TODO(), goods_key).Int64() fmt.Println(value) } 下图是部分抢购中、已抢购完、Redis 已抢到的用户数据
2022年-7月-17日
219 阅读
0 评论
服务架构