[PD-cvs] supercollider/python OSC.py,NONE,1.1 supercollider.py,NONE,1.1

ksvalast at users.sourceforge.net ksvalast at users.sourceforge.net
Tue Jan 20 17:54:47 CET 2004


Update of /cvsroot/pure-data/supercollider/python
In directory sc8-pr-cvs1:/tmp/cvs-serv7437

Added Files:
	OSC.py supercollider.py 
Log Message:
Various tools to make supercollider more convenient to use within PD

--- NEW FILE: OSC.py ---
#!/usr/bin/env python
# ======================================================================
# file:		OSC.py
# author:	stefan kersten <steve at k-hornz.de>
# contents:	OSC client module for python
# license:	public domain
# ======================================================================
# $Id: OSC.py,v 1.1 2004/01/20 16:54:44 ksvalast Exp $
# ======================================================================
# copyright (c) 2000 stefan kersten
# ======================================================================
# this module provides simple OSC client functionality
# usage examples down at the end of the file
# ======================================================================

__revision__ = "$Revision: 1.1 $"

# ======================================================================
# imports

import cStringIO, exceptions, math, socket, struct, time, types

# ======================================================================
# constants

SECONDS_UTC_TO_UNIX_EPOCH = 2208988800.0
FLOAT_TO_INT_SCALE = pow(2.0, 32.0)

# ======================================================================
# types

class Value:
    """Abstract OSC value."""
    def __init__(self, value):
	self.value = value

    def binary_value(self):
        pass

    def type_tag(self):
        pass

class Int(Value):
    """32 bit integer value."""
    def __init__(self, value):
        Value.__init__(self, long(value))
        
    def binary_value(self):
        return struct.pack('!l', self.value)

    def type_tag(self):
        return 'i'

class Float(Value):
    """32 bit floating point value."""
    def __init__(self, value):
        Value.__init__(self, float(value))
        
    def binary_value(self):
        return struct.pack('!f', self.value)

    def type_tag(self):
        return 'f'

class String(Value):
    """Null-terminated string padded to multiples of 4 byte."""
    def __init__(self, value):
        Value.__init__(self, str(value))
        
    def binary_value(self):
        v = self.value
	l = len(v)
        return struct.pack('%ds%dx' % (l, self.pad_amount(l)), v)

    def type_tag(self):
        return 's'
    
    def pad_amount(self, len):
        return 4 - (len % 4)

class Time(Value):
    """64 bit timetag in NTP format."""
    def __init__(self, value):
        Value.__init__(self, float(value))
        
    def __add__(self, time):
	return Time(float(self.value + time.value))

    def binary_value(self):
	t = self.value
	# FIXME: how to convert without overflows?
	s = long(t)
	f = long(math.fmod(t, 1.0)*FLOAT_TO_INT_SCALE)
	return struct.pack('!LL', s, f)

# ======================================================================
# utilities

time_module = time
def time():
    """Return current time as float in OSC format."""
    return SECONDS_UTC_TO_UNIX_EPOCH + time_module.time()

# ======================================================================
# classes

class Packet:
    """Abstract base class for all OSC-related containers.

    Has methods for retrieving the proper binary representation
    and its size.
    """
    def __init__(self, packets):
        stream = cStringIO.StringIO()
        self._write_contents(packets, stream)
        self._data = stream.getvalue()

    def get_packet(self):
        """Return the binary representation of the receiver's contents.

        This data is in the proper OSC format and can be sent over a
        socket.
        """
        return self._data

    def get_size(self):
        """Return the size of the receiver's binary data."""
        return len(self._data)
    
    def _write_contents(self, packets, stream):
        """Write packets on stream.

        Private.

        Override in subclasses for specific behavior.
        """
        pass

    def __repr__(self):
        return '<' + \
               str(self.__class__.__name__) + \
               ' instance, size=' + \
               str(self.get_size()) + \
               '>'

    def sendto(self, host, port):
        """Send the receiver's data through a UDP socket."""
        s = socket.socket(socket.SOCK_DGRAM, socket.AF_INET)
        packet = self.get_packet()
        s.sendto(packet, (host, port))
	s.close()

    def sendlocal(self, port):
        """Send the receiver's data through a UDP socket locally."""
        self.sendto('localhost', port)

def _value(x):
    """Convert x(int, float or string) to an OSC object."""
    t = type(x)
    if t == types.FloatType:
        return Float(x)
    if t == types.IntType or t == types.LongType:
        return Int(x)
    # return string representation as default
    return String(str(x))

class Message(Packet):
    """Single OSC message with arguments.

    Message(address, *args) -> Message
    
    address	-- OSC address string
    *args 	-- message argument list
    """
    def __init__(self, address, args=[]):
	Packet.__init__(self, [String(address)] + map(lambda x: _value(x), args))

    def _write_contents(self, args, stream):
        t_stream = cStringIO.StringIO()	# tag stream
        v_stream = cStringIO.StringIO()	# value stream
	# open signature string
	t_stream.write(',')
	# collect tags and arguments
        for v in args[1:]:
            t_stream.write(v.type_tag())
            v_stream.write(v.binary_value())
        # write address
        stream.write(args[0].binary_value())
        # write signature
        stream.write(String(t_stream.getvalue()).binary_value())
        # write arguments
        stream.write(v_stream.getvalue())

class Bundle(Packet):
    """OSC container type with timing information.

    Bundle(time, packets) -> Bundle

    time	-- floating point timetag in OSC units
    packets	-- array of Packet(s)
    """
    def __init__(self, time, packets):
	Packet.__init__(self, [Time(time)] + packets)

    def _write_contents(self, args, stream):
        # write '#bundle' preamble
        stream.write(String('#bundle').binary_value())
        # write timetag
        stream.write(args[0].binary_value())
        # write packets, prefixed with a byte count
        for packet in args[1:]:
            data = packet.get_packet()
            size = len(data)
            stream.write(Int(size).binary_value())
            stream.write(data)

def test(port):
    """Some example messages and bundles, sent to port."""
    Message("/filter/cutoff", [145.1232]).sendlocal(port)
    Message("/http", ["www dot k-hornz dot de", 12, 3.41, "bulb"]).sendlocal(port)
    #print Int(len(Message("/msg").get_packet())).binary_value()
    Bundle(0.1, [Message("/fubar")]).sendlocal(port)
    Bundle(time(), [Message("/msg", [1.0, "+", 1, 61, "0"]), Message("/bang!")]).sendlocal(port)

if __name__ == "__main__":
    """Run dumpOSC on port 10000."""
    test(10000)

# EOF
# ======================================================================

--- NEW FILE: supercollider.py ---

#/* --------------------------- supercollider.py  ----------------------------------- */
#/*   ;; Kjetil S. Matheussen, 2004.                                             */
#/*                                                                              */
#/* This program is free software; you can redistribute it and/or                */
#/* modify it under the terms of the GNU General Public License                  */
#/* as published by the Free Software Foundation; either version 2               */
#/* of the License, or (at your option) any later version.                       */
#/*                                                                              */
#/* This program is distributed in the hope that it will be useful,              */
#/* but WITHOUT ANY WARRANTY; without even the implied warranty of               */
#/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the                */
#/* GNU General Public License for more details.                                 */
#/*                                                                              */
#/* You should have received a copy of the GNU General Public License            */
#/* along with this program; if not, write to the Free Software                  */
#/* Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.  */
#/*                                                                              */
#/* ---------------------------------------------------------------------------- */



# This file is far from complete, but its a start.
# -Kjetil.


import OSC,tempfile,xreadlines,os,time

standardport=57110
startnode=1001;
startbuffer=0;
    
sc_head=0
sc_tail=1
sc_before=2
sc_after=3
sc_replace=4


# For some crazy reason, the sclang command tells the server to quits when exiting.
# To fix that strange and unpleasant behaviour, uncommnet the following lines in etc. etc. etc.


class Server:
    def __init__(self,magic,port=standardport):
        if magic!=1234:
            print "Server.__init__: Are you sure you know what you are doing?"
            print "Seems like probably wanted to use the 'localServer' variable."
        self.port=port
        self.freenode=startnode;
        self.freebuffer=startbuffer;
    def sendMsg(self,command,*args):
        OSC.Message(command,args).sendlocal(self.port)
    def sendgetMsg(self,command,*args):
        apply(self.sendMsg,[command]+list(args))
        time.sleep(1)
    def dumpOSC(self,code):
        self.sendMsg("dumpOSC",code);
    def nextNodeID(self):
        self.freenode+=1;
        return self.freenode-1
    def nextBufferID(self):
        self.freebuffer+=1;
        return self.freebuffer-1;
    def loadSynthDef(self,name):
        self.sendgetMsg("/d_load",name)
    def loadSynthDefDir(self,dir):
        self.sendMsg("/d_loadDir",dir)
    def evalSynth(self,synthname):
        tmpname=tempfile.mktemp(".sc")
        outfile=open(tmpname,"w")
        outfile.write('SynthDef("'+synthname+'",{')
        for line in xreadlines.xreadlines(open(synthname+".sc","r")):
            outfile.write(line)
        tmpname2=tempfile.mktemp("")
        outfile.write('}).writeDefFile("'+tmpname2+'");\n')
        outfile.close()
        os.system("sclang "+tmpname)
        tmpname2+=synthname+".scsyndef"
        self.loadSynthDef(tmpname2)
        os.system("rm "+tmpname+" "+tmpname2)
        
localServer=Server(1234)

class Node:
    def __del__(self):
        self.server.sendMsg("/n_free",self.id)
    def set(self,*args):
        apply(self.server.sendMsg,["/n_set",self.id]+list(args))


class Synth(Node):
    def __init__(self,server,name,args=[],position=sc_tail):
        self.server=server
        self.id=server.nextNodeID()
        apply(self.server.sendMsg,["/s_new",name,self.id,position,0]+args)


class BufferSuper:
    def __init__(self,server,numFrames=0,numChannels=1,filename="",startFrame=0):
        self.server=server
        self.id=server.nextBufferID()
        if numChannels==-1:
            server.sendMsg("/b_allocRead",self.id,filename,startFrame,numFrames,0)
        else:
            server.sendMsg("/b_alloc",self.id,numFrames,numChannels,0)
        
    def __del__(self):
        self.server.sendMsg("/b_free",self.id)


class Buffer(BufferSuper):
    def __init__(self,server,numFrames,numChannels=1):
        BufferSuper.__init__(self,server,numFrames,numChannels)

class BufferRead(BufferSuper):
    def __init__(self,server,filename,startFrame=0,numFrames=0):
        BufferSuper.__init__(self,server,numFrames,-1,filename,startFrame)









More information about the Pd-cvs mailing list