Serverless与Websocket的聊天工具

anycodes3个月前领域实战2070

前言

传统业务如果想要实现Websocket是比较容易的一件事情,但是函数计算并不支持长链接这样的操作,基本上都是事件驱动,那么在函数计算与API网关结合,是否可以由Websocket的实现方案呢?

API网关触发器实现Websocket

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工(full-duplex)通信,即允许服务器主动发送信息给客户端。WebSocket 在服务端有数据推送需求时,可以主动发送数据至客户端。而原有 HTTP 协议的服务端对于需推送的数据,仅能通过轮询或 long poll 的方式来让客户端获得。

由于云函数是无状态且以触发式运行,即在有事件到来时才会被触发,因此,为了实现 WebSocket,云函数与 API 网关相结合,通过 API 网关承接及保持与客户端的连接。您可以认为 API 网关与 SCF 一起实现了服务端。当客户端有消息发出时,会先传递给 API 网关,再由 API 网关触发云函数执行。当服务端云函数要向客户端发送消息时,会先由云函数将消息 POST 到 API 网关的反向推送链接,再由 API 网关向客户端完成消息的推送。具体的实现架构如下:

对于 WebSocket 的整个生命周期,主要由以下几个事件组成:

  • 连接建立:客户端向服务端请求建立连接并完成连接建立。

  • 数据上行:客户端通过已经建立的连接向服务端发送数据。

  • 数据下行:服务端通过已经建立的连接向客户端发送数据。

  • 客户端断开:客户端要求断开已经建立的连接。

  • 服务端断开:服务端要求断开已经建立的连接。

对于 WebSocket 整个生命周期的事件,云函数和 API 网关的处理过程如下:

  • 连接建立:客户端与 API 网关建立 WebSocket 连接,API 网关将连接建立事件发送给 SCF。

  • 数据上行:客户端通过 WebSocket 发送数据,API 网关将数据转发送给 SCF。

  • 数据下行:SCF 通过向 API 网关指定的推送地址发送请求,API 网关收到后会将数据通过 WebSocket 发送给客户端。

  • 客户端断开:客户端请求断开连接,API 网关将连接断开事件发送给 SCF。

  • 服务端断开:SCF 通过向 API 网关指定的推送地址发送断开请求,API 网关收到后断开 WebSocket 连接。

因此,API 网关与 SCF 之间的交互,需要由3类云函数来承载:

  • 注册函数:在客户端发起和 API 网关之间建立 WebSocket 连接时触发该函数,通知 SCF WebSocket 连接的 secConnectionID。通常会在该函数记录 secConnectionID 到持久存储中,用于后续数据的反向推送。

  • 清理函数:在客户端主动发起 WebSocket 连接中断请求时触发该函数,通知 SCF 准备断开连接的 secConnectionID。通常会在该函数清理持久存储中记录的该 secConnectionID。

  • 传输函数:在客户端通过 WebSocket 连接发送数据时触发该函数,告知 SCF 连接的 secConnectionID 以及发送的数据。通常会在该函数处理业务数据。例如,是否将数据推送给持久存储中的其他 secConnectionID。

Websocket功能实现

根据腾讯云官网提供的该动能的整体架构图:

这里我们可以使用COS(对象存储)作为持久化的方案,当用户建立链接存储ConnectionId到COS中,当用户断开连接删除该链接Id。

其中注册函数:

# -*- coding: utf8 -*-
import os
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))


def main_handler(event, context):
    print("event is %s" % event)

    connectionID = event['websocket']['secConnectionID']

    retmsg = {}
    retmsg['errNo'] = 0
    retmsg['errMsg'] = "ok"
    retmsg['websocket'] = {
        "action""connecting",
        "secConnectionID": connectionID
    }

    cosClient.put_object(
        Bucket=bucket,
        Body='websocket'.encode("utf-8"),
        Key=str(connectionID),
        EnableMD5=False
    )

    return retmsg

传输函数:

# -*- coding: utf8 -*-
import os
import json
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))

sendbackHost = os.environ.get("url")


def Get_ConnectionID_List():
    response = cosClient.list_objects(
        Bucket=bucket,
    )
    return [eve['Key'for eve in response['Contents']]


def send(connectionID, data):
    retmsg = {}
    retmsg['websocket'] = {}
    retmsg['websocket']['action'] = "data send"
    retmsg['websocket']['secConnectionID'] = connectionID
    retmsg['websocket']['dataType'] = 'text'
    retmsg['websocket']['data'] = data
    requests.post(sendbackHost, json=retmsg)


def main_handler(event, context):
    print("event is %s" % event)

    connectionID_List = Get_ConnectionID_List()
    connectionID = event['websocket']['secConnectionID']
    count = len(connectionID_List)
    data = event['websocket']['data'] + "(===Online people:" + str(count) + "===)"
    for ID in connectionID_List:
        if ID != connectionID:
            send(ID, data)

    return "send success"

清理函数:

# -*- coding: utf8 -*-
import os
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))

sendbackHost = os.environ.get("url")


def main_handler(event, context):
    print("event is %s" % event)

    connectionID = event['websocket']['secConnectionID']

    retmsg = {}
    retmsg['websocket'] = {}
    retmsg['websocket']['action'] = "closing"
    retmsg['websocket']['secConnectionID'] = connectionID
    requests.post(sendbackHost, json=retmsg)

    cosClient.delete_object(
        Bucket=bucket,
        Key=str(connectionID),
    )

    return event

我们的Yaml格式可以如下:

Conf:
  component: "serverless-global"
  inputs:
    region: ap-guangzhou
    bucket: chat-cos-1256773370
    secret_id: 
    secret_key: 

myBucket:
  component: '@serverless/tencent-cos'
  inputs:
    bucket: ${Conf.bucket}
    region: ${Conf.region}

restApi:
  component: '@serverless/tencent-apigateway'
  inputs:
    region: ${Conf.region}
    protocols:
      - http
      - https
    serviceName: ChatDemo
    environment: release
    endpoints:
      - path: /
        method: GET
        protocol: WEBSOCKET
        serviceTimeout: 800
        function:
          transportFunctionName: ChatTrans
          registerFunctionName: ChatReg
          cleanupFunctionName: ChatClean


ChatReg:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatReg
    codeUri: ./code
    handler: reg.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

ChatTrans:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatTrans
    codeUri: ./code
    handler: trans.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

ChatClean:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatClean
    codeUri: ./code
    handler: clean.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

这里要注意,我们需要先部署API网关,部署完成,获得到回推地址,将回推地址以url的形式写入到对应函数的环境变量中:

这里理论上设计的不是很合理,按照道理是可以通过${restApi.url[0].internalDomain}自动获得到url的,但是我并没有成功获得到这个url,所以只能先部署API网关,获得到这个地址之后,再重新部署。

部署完成之后,我们可以编写HTML代码,实现可视化的Websocket Client,其核心的JavaScript代码为:

window.onload = function ({
    var conn;
    var msg = document.getElementById("msg");
    var log = document.getElementById("log");

    function appendLog(item{
        var doScroll = log.scrollTop === log.scrollHeight - log.clientHeight;
        log.appendChild(item);
        if (doScroll) {
            log.scrollTop = log.scrollHeight - log.clientHeight;
        }
    }

    document.getElementById("form").onsubmit = function ({
        if (!conn) {
            return false;
        }
        if (!msg.value) {
            return false;
        }
        conn.send(msg.value);
        //msg.value = "";

        var item = document.createElement("div");
        item.innerText = "发送↑:";
        appendLog(item);

        var item = document.createElement("div");
        item.innerText = msg.value;
        appendLog(item);

        return false;
    };

    if (window["WebSocket"]) {
        //替换为websocket连接地址
        conn = new WebSocket("ws://service-01era6ni-1256773370.gz.apigw.tencentcs.com/release/");
        conn.onclose = function (evt{
            var item = document.createElement("div");
            item.innerHTML = "<b>Connection closed.</b>";
            appendLog(item);
        };
        conn.onmessage = function (evt{
            var item = document.createElement("div");
            item.innerText = "接收↓:";
            appendLog(item);

            var messages = evt.data.split('\n');
            for (var i = 0; i < messages.length; i++) {
                var item = document.createElement("div");
                item.innerText = messages[i];
                appendLog(item);
            }
        };
    } else {
        var item = document.createElement("div");
        item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
        appendLog(item);
    }
};

完成之后,我们打开两个页面,进行测试:

总结

通过云函数 + API网关进行Websocket的实践,绝对不仅仅是一个聊天工具这么简单,他可以用在很多方面,例如通过Websocket进行实时日志系统的制作等。

单独的函数计算,仅仅是一个计算平台,只有和周边的BaaS结合,才能展示出Serverless架构的价值和真正的能力,意义。这也是为什么很多人说Serverless=FaaS+BaaS的一个原因。

期待更多人,可以通过Serverless架构,创造出更多有趣的应用。



作者简介:刘宇,毕业于浙江大学,硕士学历,目前在腾讯工作,著有《Serverless 架构》一书,是Serverless架构的热衷者,曾做一款叫Anycodes的软件,目前下载超过100万次。

相关文章

企业微信机器人:让你每天都可以了解世界

企业微信机器人:让你每天都可以了解世界

前言通过定时触发器,可以非常简单快速的建立一个企业微信机器人;通过该机器人,我们可以定制一个提醒我们喝水、吃饭的小功能,可以实现一个定时推送新闻、天气的小功能,可以实现一个监控告警的小功能。使用企业微...

Serverless与NLP实现文本摘要和关键词提取

Serverless与NLP实现文本摘要和关键词提取

前言对文本进行自动摘要的提取和关键词的提取,属于自然语言处理的范畴。提取摘要的一个好处是可以让阅读者通过最少的信息判断出这个文章对自己是否有意义或者价值,是否需要进行更加详细的阅读;提取关键词的好处是...

Serverless实现图片压缩与水印

Serverless实现图片压缩与水印

前言在实际生产中,用户上传图片是非常常见的行为,无论是做一个相册系统,还是发布文章中带有图片,可以说图片和Web服务是非常紧密的了。但是图片所占用的空间,以及图片的大小等又都是不参差不齐的,同时有些网...

函数计算与对象存储实现WordCount

函数计算与对象存储实现WordCount

前言MapReduce在百度百科中的解释如下:MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)&...

通过Serverless架构实现监控告警能力

通过Serverless架构实现监控告警能力

前言在实际生产中,我们经常需要做一些监控脚本监控我们的网站服务或者API服务是否可用。传统的方法是通过一些网站监控平台(例如DNSPod监控、360网站服务监控,以及阿里云监控等),这些监控平台的原理...

让Serverless为你的头像增加点装饰

让Serverless为你的头像增加点装饰

前言无论是国庆还是新年,经常会有一个平台为我们提供一个生成头像的小工具,很是新奇好玩,这类平台/工具,一般都有两个方法给我们制作头像。直接加装饰,例如外面加一个框框,下面加一个logo等;通过一些机器...

Serverless实现视频压缩与格式转换

Serverless实现视频压缩与格式转换

前言 在Serverless架构的应用案例中,有这样一个非常实在的应用:视频的处理。 腾讯云的函数计算平台对这个领域的描述: 视频应用、社交应用等场景下,用户上传的图片、音视频的总量大、...

20行代码:Serverless架构下用Python轻松搞定图像分类/预测

20行代码:Serverless架构下用Python轻松搞定图像分类/预测

前言图像分类是人工智能领域的一个热门话题。通俗解释就是,根据各自在图像信息中所反映的不同特征,把不同类别的目标区分开来的图像处理方法。它利用计算机对图像进行定量分析,把图像或图像中的每个像元或区域划归...

3分钟实现文本敏感词过滤

3分钟实现文本敏感词过滤

前言敏感词过滤是随着互联网社区发展一起发展起来的一种阻止网络犯罪和网络暴力的技术手段,通过对可能存在犯罪或网络暴力可能的关键词进行有针对性的筛查和屏蔽,很多时候我们能够防患于未然,把后果严重的犯罪行为...

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。
嘿,一起Serverless