您的位置:首页 > 健康 > 美食 > 免费网页代理在线_做代理需要交钱吗_整站seo排名费用价格_网上营销方法

免费网页代理在线_做代理需要交钱吗_整站seo排名费用价格_网上营销方法

2025/5/11 10:07:54 来源:https://blog.csdn.net/weixin_40455124/article/details/146138839  浏览:    关键词:免费网页代理在线_做代理需要交钱吗_整站seo排名费用价格_网上营销方法
免费网页代理在线_做代理需要交钱吗_整站seo排名费用价格_网上营销方法

kv流的访问有所不同,将通过功能subscribeToDirect设置为直接访问

// Lock should be held.
func (mset *stream) subscribeToDirect() error {// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.if mset.directSub == nil {dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err == nil {mset.directSub = sub} else {return err}}// Now the one that will have subject appended past stream name.if mset.lastBySub == nil {dsubj := fmt.Sprintf(JSDirectGetLastBySubjectT, mset.cfg.Name, fwcs)// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetLastBySubjectRequest); err == nil {mset.lastBySub = sub} else {return err}}return nil
}

核心处理函数是processDirectGetLastBySubjectRequest,下图显示了调用堆栈和跟踪

在这里插入图片描述

func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) the call logical is same like normal stream ,via sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize]) 将调用 processDirectGetLastBySubjectRequest

// Internal account clients are for service imports and need the '\r\n'.start := time.Now()if client.kind == ACCOUNT {sub.icb(sub, c, acc, string(subject), string(reply), msg)} else {sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize])}if dur := time.Since(start); dur >= readLoopReportThreshold {srv.Warnf("Internal subscription on %q took too long: %v", subject, dur)}

finally use function getDirectRequest call store.LoadLastMsg then use mset.outq.send to send message to consumer’s inbox

最后使用函数getDirectRequest调用store.LoadLastMsg然后使用mset.outq.send将消息发送到消费者的inbox

// Do actual work on a direct msg request.
// This could be called in a Go routine if we are inline for a non-client connection.
func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {var svp StoreMsgvar sm *StoreMsgvar err errormset.mu.RLock()store, name := mset.store, mset.cfg.Namemset.mu.RUnlock()if req.Seq > 0 && req.NextFor == _EMPTY_ {sm, err = store.LoadMsg(req.Seq, &svp)} else if req.NextFor != _EMPTY_ {sm, _, err = store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), req.Seq, &svp)} else {sm, err = store.LoadLastMsg(req.LastFor, &svp)}if err != nil {hdr := []byte("NATS/1.0 404 Message Not Found\r\n\r\n")mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))return}hdr := sm.hdrts := time.Unix(0, sm.ts).UTC()if len(hdr) == 0 {const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\n\r\n"hdr = fmt.Appendf(nil, ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano))} else {hdr = copyBytes(hdr)hdr = genHeader(hdr, JSStream, name)hdr = genHeader(hdr, JSSubject, sm.subj)hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10))hdr = genHeader(hdr, JSTimeStamp, ts.Format(time.RFC3339Nano))}mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, sm.msg, nil, 0))
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com