kv流的访问有所不同,将通过功能subscribeToDirect设置为直接访问
// Lock should be held.
func (mset *stream) subscribeToDirect() error {// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.if mset.directSub == nil {dsubj := fmt.Sprintf(JSDirectMsgGetT, mset.cfg.Name)if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetRequest); err == nil {mset.directSub = sub} else {return err}}// Now the one that will have subject appended past stream name.if mset.lastBySub == nil {dsubj := fmt.Sprintf(JSDirectGetLastBySubjectT, mset.cfg.Name, fwcs)// We will make this listen on a queue group by default, which can allow mirrors to participate on opt-in basis.if sub, err := mset.queueSubscribeInternal(dsubj, dgetGroup, mset.processDirectGetLastBySubjectRequest); err == nil {mset.lastBySub = sub} else {return err}}return nil
}
核心处理函数是processDirectGetLastBySubjectRequest,下图显示了调用堆栈和跟踪
func (c *client) deliverMsg(prodIsMQTT bool, sub *subscription, acc *Account, subject, reply, mh, msg []byte, gwrply bool) the call logical is same like normal stream ,via sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize]) 将调用 processDirectGetLastBySubjectRequest
// Internal account clients are for service imports and need the '\r\n'.start := time.Now()if client.kind == ACCOUNT {sub.icb(sub, c, acc, string(subject), string(reply), msg)} else {sub.icb(sub, c, acc, string(subject), string(reply), msg[:msgSize])}if dur := time.Since(start); dur >= readLoopReportThreshold {srv.Warnf("Internal subscription on %q took too long: %v", subject, dur)}
finally use function getDirectRequest call store.LoadLastMsg then use mset.outq.send to send message to consumer’s inbox
最后使用函数getDirectRequest调用store.LoadLastMsg然后使用mset.outq.send将消息发送到消费者的inbox
// Do actual work on a direct msg request.
// This could be called in a Go routine if we are inline for a non-client connection.
func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {var svp StoreMsgvar sm *StoreMsgvar err errormset.mu.RLock()store, name := mset.store, mset.cfg.Namemset.mu.RUnlock()if req.Seq > 0 && req.NextFor == _EMPTY_ {sm, err = store.LoadMsg(req.Seq, &svp)} else if req.NextFor != _EMPTY_ {sm, _, err = store.LoadNextMsg(req.NextFor, subjectHasWildcard(req.NextFor), req.Seq, &svp)} else {sm, err = store.LoadLastMsg(req.LastFor, &svp)}if err != nil {hdr := []byte("NATS/1.0 404 Message Not Found\r\n\r\n")mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))return}hdr := sm.hdrts := time.Unix(0, sm.ts).UTC()if len(hdr) == 0 {const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\n\r\n"hdr = fmt.Appendf(nil, ht, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano))} else {hdr = copyBytes(hdr)hdr = genHeader(hdr, JSStream, name)hdr = genHeader(hdr, JSSubject, sm.subj)hdr = genHeader(hdr, JSSequence, strconv.FormatUint(sm.seq, 10))hdr = genHeader(hdr, JSTimeStamp, ts.Format(time.RFC3339Nano))}mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, sm.msg, nil, 0))
}