介绍
本示例实现了连接websocket接口并监听礼物事件的简单demo。
代码示例(JS)
可运行在服务端(nodejs)和客户端(小程序)。
它以npm包的形式存在,名字叫@hyext/communication。
客户端代码
import { UI } from '@hyext/hy-ui'
import React, { Component } from 'react'
import './app.hycss'
import { createOpenWS, WSEventIds } from "@hyext/communication";
const { View, Text } = UI
class App extends Component {
componentDidMount() {
hyExt.context.getStreamerInfo().then((streamerInfo) => {
const ws = createOpenWS({
appId: 'your_appid',
secret: 'your_secret',
expireTimeDelta: 60 * 10, // 10分钟
extUuid: 'your_extUuid',
roomId: streamerInfo.streamerRoomId, // 直播间ID
debug: true
})
// 监听ws内置事件
ws.on(WSEventIds.close, (data) => {
console.log('The ws has closed.')
})
// 监听open-api弹幕事件
ws.on('getMessageNotice', (data) => {
console.log(data, 'getMessageNotice')
})
})
}
render () {
return (
<View className="container"><Text>hello world</Text></View>
)
}
}
export default App
服务端代码
const createOpenWS = require('@hyext/communication').createOpenWS
const ws = createOpenWS({
appId: 'your_appid',
secret: 'your_secret',
expireTimeDelta: 60 * 10, // 10分钟
extUuid: 'your_extUuid',
roomId: target_roomid, // 要监听的目标房间号
debug: true
})
// 监听弹幕的消息
ws.on('getMessageNotice', (data) => {
console.log(data, 'getMessageNotice')
})
代码示例(JAVA)
具体可在代码仓库中查看详细代码。
package hyext.ebs.examples.websocket;
import com.alibaba.fastjson.JSONObject;
import com.auth0.jwt.JWT;
import com.auth0.jwt.algorithms.Algorithm;
import hyext.ebs.examples.utils.ParamsUtil;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
* 开放API websocket 接入实现样例
*
*/
public class WebSocketClient extends org.java_websocket.client.WebSocketClient {
public WebSocketClient(URI serverUri) {
super(serverUri);
}
@Override
public void onOpen(ServerHandshake arg0) {
System.out.println("------ WebSocketClient onOpen ------");
}
@Override
public void onClose(int arg0, String arg1, boolean arg2) {
System.out.println("------ WebSocketClient onClose ------");
}
@Override
public void onError(Exception arg0) {
System.out.println("------ WebSocketClient onError ------");
}
@Override
public void onMessage(String arg0) {
System.out.println("-------- 接收到服务端数据: " + arg0 + "--------");
try {
JSONObject res = JSONObject.parseObject(arg0);
if("command".equals(res.getString("notice"))) {//监听成功回包
System.out.println("-------- 监听事件: " + res.getJSONObject("data").getJSONArray("data") + " 成功--------");
}
if("getSendItemNotice".equals(res.getString("notice"))) {
JSONObject data = JSONObject.parseObject(arg0).getJSONObject("data");
//粉丝徽章名称
String badgeName = data.getString("badgeName");
//粉丝等级
Integer fansLevel = data.getInteger("fansLevel");
//礼物id
Integer giftId = data.getInteger("itemId");
//贵族等级
Integer nobleLevel = data.getInteger("nobleLevel");
//房间号
Long roomId = data.getLong("roomId");
//送礼连击数
Long sendItemCount = data.getLong("sendItemCount");
//送礼者昵称
String sendNick = data.getString("sendNick");
//用户等级
Long senderLevel = data.getLong("senderLevel");
System.out.println(String.format("-------- 粉丝勋章:%s,粉丝等级:%s,礼物id:%s,贵族等级:%s,房间号:%s,送礼连击数:%s,送礼者昵称:%s,用户等级:%s --------"
,badgeName,fansLevel,giftId,nobleLevel,roomId,sendItemCount,sendNick,senderLevel));
}
} catch (Exception e) {
System.out.println("-------- 数据处理异常 --------");
}
}
/**
* 生成开放API Websocket连接参数
* @param appId 开发者ID(https://ext.huya.com成为开发者后自动生成)
* @param secret 开发者密钥(https://ext.huya.com成为开发者后自动生成)
* @param roomId 要监听主播的房间号
* @return
*/
public static Map<String, Object> getWebSocketJwtParamsMap(String appId, String secret, long roomId){
//获取时间戳(毫秒)
long currentTimeMillis = System.currentTimeMillis();
long expireTimeMillis = System.currentTimeMillis() + 10 * 60 * 1000; //超时时间:通常设置10分钟有效,即exp=iat+600,注意不少于当前时间且不超过当前时间60分钟
Date iat = new Date(currentTimeMillis);
Date exp = new Date(expireTimeMillis);
try {
Map<String, Object> header = new HashMap<String, Object>();
header.put("alg", "HS256");
header.put("typ", "JWT");
//生成JWT凭证
Algorithm algorithm = Algorithm.HMAC256(secret); //开发者密钥
String sToken = JWT.create()
.withHeader(header) //JWT声明
.withIssuedAt(iat) //jwt凭证生成时间
.withExpiresAt(exp) //jwt凭证超时时间
.withClaim("appId", appId) //开发者ID
.sign(algorithm);
Map<String, Object> authMap = new HashMap<String, Object>();
authMap.put("iat", currentTimeMillis / 1000); //jwt凭证生成时间戳(秒)
authMap.put("exp", expireTimeMillis / 1000); //jwt凭证超时时间戳(秒)
authMap.put("sToken", sToken); //jwt签名串
authMap.put("appId",appId); //开发者ID
authMap.put("do", "comm"); //接口默认参数
authMap.put("roomId", roomId); //需要监听主播的房间号
return authMap;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
try {
int ExecuteSecond = 60; //监听时间秒
String appId = ""; //小程序开发者ID(成为开发者后,https://ext.huya.com可查)
String secret = ""; //小程序开发者密钥(成为开发者后,https://ext.huya.com可查)
long roomId = ; //监听主播的房间号
Map<String, Object> map = new HashMap<String, Object>(16);
map = WebSocketClient.getWebSocketJwtParamsMap(appId,secret,roomId);
StringBuffer urlBuffer = new StringBuffer();
urlBuffer.append("ws://ws-apiext.huya.com/index.html").append(ParamsUtil.MapToUrlString(map));
WebSocketClient client = new WebSocketClient(URI.create(urlBuffer.toString()));
client.setConnectionLostTimeout(3600);
client.connect();
while (!client.getReadyState().equals(ReadyState.OPEN)) {
}
Long reqId = System.currentTimeMillis();
String sendMsg = "{\"command\":\"subscribeNotice\",\"data\":[\"getSendItemNotice\"],\"reqId\":\"" + reqId + "\"}";
client.send(sendMsg);
int count = 1;
while (count < ExecuteSecond) {
Thread.sleep(1000);
System.out.println("connect time:" + count);
client.send("ping");
count++;
}
client.closeConnection(0,"bye");
} catch (Exception e) {
e.printStackTrace();
}
}
}
代码示例(python)
具体可在代码仓库中查看详细代码。
import websocket
import json
import time
import threading
import logging
from urllib.parse import urlencode
class WebsocketClient(object):
"""docstring for WebsocketClient"""
def __init__(self, address, message_callback=None):
super(WebsocketClient, self).__init__()
self.address = address
self.message_callback = message_callback
def on_message(self, ws, message):
print("on_client_message:", message)
if self.message_callback:
self.message_callback(message)
data = json.loads(message)
if "command" == data.get("notice"):
print("-------- 监听事件:{} 成功--------".format(data.get("data")))
if "getSendItemNotice" == data.get("notice"):
print("-------- 粉丝勋章:{},粉丝等级:{},礼物id:{},贵族等级:{},房间号:{},送礼连击数:{},送礼者昵称:{},用户等级:{} --------"
.format(data.get("data").get("badgeName"),
data.get("data").get("fansLevel"),
data.get("data").get("itemId"),
data.get("data").get("nobleLevel"),
data.get("data").get("roomId"),
data.get("data").get("sendItemCount"),
data.get("data").get("sendNick"),
data.get("data").get("senderLevel")))
def on_error(self, ws, error):
logging.exception(error)
def on_close(self, ws):
print("### client closed ###")
self.ws.close()
self.is_running = False
def on_open(self, ws):
self.is_running = True
print("on open")
def close_connect(self):
self.ws.close()
def send_message(self, message):
try:
self.ws.send(message)
except Exception as err:
logging.exception(err)
def run(self):
websocket.enableTrace(True)
self.ws = websocket.WebSocketApp(self.address,
on_message = lambda ws,message: self.on_message(ws, message),
on_error = lambda ws, error: self.on_error(ws, error),
on_close = lambda ws :self.on_close(ws))
self.ws.on_open = lambda ws: self.on_open(ws)
self.is_running = False
while True:
print(self.is_running)
if not self.is_running:
self.ws.run_forever()
time.sleep(3)
class WSClient(object):
def __init__(self, address, call_back):
super(WSClient, self).__init__()
self.client = WebsocketClient(address, call_back)
self.client_thread = None
def run(self):
self.client_thread = threading.Thread(target=self.run_client)
self.client_thread.start()
def run_client(self):
self.client.run()
def send_message(self, message):
print(message)
self.client.send_message(message)
# 开发者ID(https://ext.huya.com成为开发者后自动生成)
appId = "test_appId"
# 要监听主播的房间号
roomId = "test_roomId"
socket_uri = "ws://ws-apiext.huya.com/index.html?"
# 接口返回的jwt
jwt = "test_jwt"
params = {
"iat": "jwt中的iat",
"exp": "jwt中的exp",
"sToken": jwt,
"appId": appId,
"roomId": roomId,
"do": "comm",
}
socket_uri += urlencode(params)
print(socket_uri)
ws_client = WSClient(socket_uri, lambda message: print("call_back message:", message))
ws_client.run()
# 等待5秒
time.sleep(5)
# 发送订阅命令
message = {
"command": "subscribeNotice",
"data": ["getSendItemNotice"],
"reqId": int(time.time())
}
message = json.dumps(message).encode()
ws_client.send_message(message)
代码示例(C#)
以下代码是以C#语言编写的例子,可直接在IDE中运行
using System.IdentityModel.Tokens.Jwt;
using System.Text;
using System.Text.Json;
using System.Web;
using Microsoft.IdentityModel.Tokens;
using WebSocketSharp;
public class JwtGenerator
{
public Dictionary<string, Object> GenerateJwt(string secret, string appId, long roomId)
{
long iat = DateTimeOffset.Now.ToUnixTimeSeconds();
long exp = iat + 600;
var payload = new JwtPayload
{
{"iat", iat},
{"exp", exp},
{"appId", appId}
};
// 创建密钥
var secretKey = new SymmetricSecurityKey(Encoding.UTF8.GetBytes(secret));
// 创建签名凭证
var signingCredentials = new SigningCredentials(secretKey, SecurityAlgorithms.HmacSha256);
// 使用构造函数创建JwtSecurityToken实例
var header = new JwtHeader(signingCredentials);
var jwtToken = new JwtSecurityToken(header, payload);
// 生成令牌字符串
var tokenHandler = new JwtSecurityTokenHandler();
var sToken = tokenHandler.WriteToken(jwtToken);
var map = new Dictionary<string, Object>();
map["iat"] = iat; //jwt凭证生成时间戳(秒)
map["exp"] = exp; //jwt凭证超时时间戳(秒)
map["sToken"] = sToken; //jwt签名串
map["appId"] = appId; //开发者ID
map["do"] = "comm"; //接口默认参数
map["roomId"] = roomId; //需要监听主播的房间号
return map;
}
}
public class QueryStringHelper
{
public static string BuildQueryString(Dictionary<string, Object> parameters)
{
var queryString = HttpUtility.ParseQueryString(string.Empty);
foreach (var kvp in parameters)
{
queryString[kvp.Key] = Uri.EscapeDataString(kvp.Value.ToString());
}
return queryString.ToString();
}
}
public class WebSocketWarpClient
{
private WebSocketSharp.WebSocket webSocket;
public WebSocketWarpClient(string url)
{
webSocket = new WebSocketSharp.WebSocket(url);
webSocket.OnOpen += OnOpen;
webSocket.OnMessage += OnMessage;
webSocket.OnError += OnError;
webSocket.OnClose += OnClose;
}
public void Connect()
{
webSocket.Connect();
}
public void Send(string message)
{
webSocket.Send(message);
}
public void Close()
{
webSocket.Close();
}
private static void OnOpen(object sender, EventArgs e)
{
Console.WriteLine("WebSocket connected");
}
private static void OnMessage(object sender, MessageEventArgs e)
{
Console.WriteLine("Received message: " + e.Data);
}
private static void OnError(object sender, WebSocketSharp.ErrorEventArgs e)
{
Console.WriteLine("Error: " + e.Message);
}
private static void OnClose(object sender, CloseEventArgs e)
{
Console.WriteLine("WebSocket closed");
}
}
public class Program
{
public static async Task Main(string[] args)
{
String appId = "替换为开发者appId";
String secret = "替换为开发者secret";
long roomId = 1111; // 替换为需要监听的直播间房间号
var jwtGenerator = new JwtGenerator();
Dictionary<string, Object> map = jwtGenerator.GenerateJwt(secret, appId, roomId);
Console.WriteLine($"jwt: {JsonSerializer.Serialize(map)}");
String baseUrl = "ws://ws-apiext.huya.com/index.html";
String url = baseUrl + "?" + QueryStringHelper.BuildQueryString(map);
Console.WriteLine($"url: {url}");
var client = new WebSocketWarpClient(url);
client.Connect();
long reqId = DateTimeOffset.Now.ToUnixTimeSeconds();
String sendMsg = "{\"command\":\"subscribeNotice\",\"data\":[\"getSendItemNotice\",\"getMessageNotice\"],\"reqId\":\"" + reqId + "\"}";
client.Send(sendMsg);
int count = 1;
while (count < 60) {
client.Send("ping");
Console.WriteLine($"Heartbeat: {count}");
count++;
// 等待心跳间隔
await Task.Delay(1000, new CancellationTokenSource().Token);
}
client.Close();
Console.WriteLine("Press any key to disconnect...");
Console.ReadKey();
}
}