Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 184 additions & 37 deletions pkg/sip/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ package sip

import (
"context"
"fmt"
"log/slog"
"net"
"net/netip"
"strconv"
"strings"
"sync"
"time"

"github.com/frostbyte73/core"
"golang.org/x/exp/maps"

esip "github.com/emiago/sipgo/sip"

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are using both emiago sipgo and livekit sipgo. Do we plan to only rely on one of them?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LK fork just aliases all types/functions we use to upstream ones. The only reason I import upstream here is that we don't have alias for that function/type.


"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
Expand Down Expand Up @@ -175,32 +180,173 @@ func (c *Client) getActiveCall(tag LocalTag) *outboundCall {
return c.activeCalls[tag]
}

func setUriTransport(p *sip.Uri, tr livekit.SIPTransport) {
if tr != livekit.SIPTransport_SIP_TRANSPORT_AUTO {
p.UriParams.Add("transport", tr.Name())
}
}

func buildLegacyURI(user, addr string, tr livekit.SIPTransport) (*sip.Uri, error) {
if user == "" {
return nil, fmt.Errorf("number must be set")
} else if strings.Contains(user, "@") {
return nil, fmt.Errorf("should be a phone number or SIP user, not a full SIP URI")
}
if addr == "" {
return nil, fmt.Errorf("address must be set")
}
if strings.HasPrefix(addr, "sip:") || strings.HasPrefix(addr, "sips:") {
return nil, fmt.Errorf("address must be a hostname without 'sip:' prefix")
} else if strings.Contains(addr, "transport=") {
return nil, fmt.Errorf("legacy address must not contain parameters; use transport field")
} else if strings.ContainsAny(addr, ";=") {
return nil, fmt.Errorf("legacy address must not contain parameters")
}
p := &sip.Uri{Scheme: "sip"}
setUriTransport(p, tr)

p.User = user
if host, sport, err := net.SplitHostPort(addr); err == nil && sport != "" {
p.Host = host
p.Port, err = strconv.Atoi(sport)
if err != nil {
return nil, fmt.Errorf("invalid port in hostname: %q", sport)
}
} else {
p.Host = addr
}
return p, nil
}

func buildRawURI(raw string, tr livekit.SIPTransport) (*sip.Uri, error) {
p := &sip.Uri{Scheme: "sip"}
setUriTransport(p, tr)
if err := esip.ParseUri(raw, p); err != nil {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be a bug since it'll drop what's set in setUriTransport?

return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "invalid request URI")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we return a plain fmt.Errorf? it'll be wrapped as psrpc error in the caller

}
return p, nil
}

func buildValuesURI(u *livekit.SIPUri, tr livekit.SIPTransport) (*sip.Uri, error) {
if tr != u.Transport {
if u.Transport == livekit.SIPTransport_SIP_TRANSPORT_AUTO {
//tr = tr
} else if tr == livekit.SIPTransport_SIP_TRANSPORT_AUTO {
tr = u.Transport
} else {
return nil, fmt.Errorf("different transports specified: %v vs %v", tr, u.Transport)
}
}
p := &sip.Uri{Scheme: "sip"}
setUriTransport(p, tr)
if u.User == "" {
return nil, fmt.Errorf("username or number must be set")
}
if u.Host == "" && u.Ip == "" {
return nil, fmt.Errorf("host or ip must be set")
}
p.User = u.User
p.Host = u.Host
if p.Host == "" {
p.Host = u.Ip
}
if _, sport, err := net.SplitHostPort(p.Host); err == nil && sport != "" {
return nil, fmt.Errorf("host or ip must not contain port")
}
p.Port = int(u.Port)
return p, nil
}

func buildRequestURI(u *livekit.SIPRequestDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.Uri, error) {
if u == nil {
return buildLegacyURI(legacyUser, legacyAddr, tr)
}
switch u := u.Uri.(type) {
default:
case *livekit.SIPRequestDest_Raw:
return buildRawURI(u.Raw, tr)
case *livekit.SIPRequestDest_Values:
return buildValuesURI(u.Values, tr)
}
return nil, fmt.Errorf("invalid request URI type")
}

func buildFromToURI(u *livekit.SIPNamedDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.Uri, error) {
if u == nil {
return buildLegacyURI(legacyUser, legacyAddr, tr)
}
switch u := u.Uri.(type) {
default:
case *livekit.SIPNamedDest_Raw:
return buildRawURI(u.Raw, tr)
case *livekit.SIPNamedDest_Values:
return buildValuesURI(u.Values, tr)
}
return nil, fmt.Errorf("invalid URI type")
}

func buildFromHeader(u *livekit.SIPNamedDest, legacyName *string, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.FromHeader, error) {
su, err := buildFromToURI(u, legacyUser, legacyAddr, tr)
if err != nil {
return nil, err
}
h := &sip.FromHeader{
Address: *su,
}
if u != nil {
h.DisplayName = u.DisplayName
} else if legacyName != nil {
h.DisplayName = *legacyName
} else {
// Nothing specified, preserve legacy behavior
h.DisplayName = su.User
}
return h, nil
}

func buildToHeader(u *livekit.SIPNamedDest, legacyUser, legacyAddr string, tr livekit.SIPTransport) (*sip.ToHeader, error) {
su, err := buildFromToURI(u, legacyUser, legacyAddr, tr)
if err != nil {
return nil, err
}
h := &sip.ToHeader{
Address: *su,
}
if u != nil {
h.DisplayName = u.DisplayName
}
return h, nil
}

func buildOutboundHeaders(req *rpc.InternalCreateSIPParticipantRequest, defaultHost string) (*sip.Uri, *sip.FromHeader, *sip.ToHeader, error) {
uri, err := buildRequestURI(req.SipRequestUri, req.CallTo, req.Address, req.Transport)
if err != nil {
return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid request URI: %w", err))
}
to, err := buildToHeader(req.SipToHeader, req.CallTo, req.Address, req.Transport)
if err != nil {
return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid To header: %w", err))
}
fromHost := req.Hostname
if fromHost == "" {
fromHost = defaultHost
}
from, err := buildFromHeader(req.SipFromHeader, req.DisplayName, req.Number, fromHost, req.Transport)
if err != nil {
return nil, nil, nil, psrpc.NewError(psrpc.InvalidArgument, fmt.Errorf("invalid From header: %w", err))
}
return uri, from, to, nil
}

func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCreateSIPParticipantRequest) (resp *rpc.InternalCreateSIPParticipantResponse, retErr error) {
if c.mon.Health() != stats.HealthOK {
return nil, siperrors.ErrUnavailable
}
req.Upgrade()
if req.CallTo == "" {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "call-to number must be set")
} else if req.Address == "" {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "trunk adresss must be set")
} else if req.Number == "" {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "trunk outbound number must be set")
} else if req.RoomName == "" {
if req.RoomName == "" {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "room name must be set")
}
if strings.Contains(req.CallTo, "@") {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "call_to should be a phone number or SIP user, not a full SIP URI")
}
if strings.HasPrefix(req.Address, "sip:") || strings.HasPrefix(req.Address, "sips:") {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must be a hostname without 'sip:' prefix")
}
if strings.Contains(req.Address, "transport=") {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must not contain parameters; use transport field")
}
if strings.ContainsAny(req.Address, ";=") {
return nil, psrpc.NewErrorf(psrpc.InvalidArgument, "address must not contain parameters")
}
defaultHost := c.ContactURI(TransportFrom(req.Transport)).GetHost()
log := c.log
if req.ProjectId != "" {
log = log.WithValues("projectID", req.ProjectId)
Expand All @@ -212,22 +358,28 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
if err != nil {
return nil, err
}
uri, from, to, err := buildOutboundHeaders(req, defaultHost)
if err != nil {
return nil, err
}
tid := traceid.FromGUID(req.SipCallId)
log = log.WithValues(
"callID", req.SipCallId,
"traceID", tid.String(),
"room", req.RoomName,
"participant", req.ParticipantIdentity,
"participantName", req.ParticipantName,
"fromHost", req.Hostname,
"fromUser", req.Number,
"toHost", req.Address,
"toUser", req.CallTo,
"fromHost", from.Address.Host,
"fromUser", from.Address.User,
"toHost", to.Address.Host,
"toUser", to.Address.User,
"reqHost", uri.Host,
"reqUser", uri.User,
"direction", "outbound",
)

req.ParticipantAttributes = maps.Clone(req.ParticipantAttributes) // shallow clone - string/string map. Needed to avoid mutating psrpc req
state := NewCallState(c.getIOClient(req.ProjectId), c.createSIPCallInfo(req))
state := NewCallState(c.getIOClient(req.ProjectId), c.createSIPCallInfo(uri, from, to, req))

defer func() {
state.Update(ctx, func(info *livekit.SIPCallInfo) {
Expand Down Expand Up @@ -255,11 +407,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
},
}
sipConf := sipOutboundConfig{
address: req.Address,
transport: req.Transport,
host: req.Hostname,
from: req.Number,
to: req.CallTo,
uri: uri,
from: from,
to: to,
user: req.Username,
pass: req.Password,
dtmf: req.Dtmf,
Expand All @@ -273,7 +424,6 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
enabledFeatures: req.EnabledFeatures,
featureFlags: req.FeatureFlags,
mediaConfig: mconf,
displayName: req.DisplayName,
}
log.Infow("Creating SIP participant")
call, err := c.newCall(ctx, tid, c.conf, log, LocalTag(req.SipCallId), roomConf, sipConf, state, req.ProjectId)
Expand All @@ -299,13 +449,10 @@ func (c *Client) createSIPParticipant(ctx context.Context, req *rpc.InternalCrea
return info, nil
}

func (c *Client) createSIPCallInfo(req *rpc.InternalCreateSIPParticipantRequest) *livekit.SIPCallInfo {
toUri := CreateURIFromUserAndAddress(req.CallTo, req.Address, TransportFrom(req.Transport))
fromiUri := URI{
User: req.Number,
Host: req.Hostname,
Addr: netip.AddrPortFrom(c.sconf.SignalingIP, uint16(c.conf.SIPPort)),
}
func (c *Client) createSIPCallInfo(uri *sip.Uri, from *sip.FromHeader, to *sip.ToHeader, req *rpc.InternalCreateSIPParticipantRequest) *livekit.SIPCallInfo {
toUri := ConvertURI(&to.Address)
fromUri := ConvertURI(&from.Address)
fromUri.Addr = netip.AddrPortFrom(c.sconf.SignalingIP, uint16(c.conf.SIPPort))

callInfo := &livekit.SIPCallInfo{
CallId: req.SipCallId,
Expand All @@ -316,7 +463,7 @@ func (c *Client) createSIPCallInfo(req *rpc.InternalCreateSIPParticipantRequest)
ParticipantAttributes: req.ParticipantAttributes,
CallDirection: livekit.SIPCallDirection_SCD_OUTBOUND,
ToUri: toUri.ToSIPUri(),
FromUri: fromiUri.ToSIPUri(),
FromUri: fromUri.ToSIPUri(),
CreatedAtNs: time.Now().UnixNano(),
MediaEncryption: req.MediaEncryption.String(),
EnabledFeatures: req.EnabledFeatures,
Expand Down
Loading
Loading