[PD-cvs] externals/pdp/modules/generic Makefile, 1.3, 1.4 pdp_del.c, 1.3, 1.4 pdp_rawin.c, 1.3, 1.4 pdp_rawout.c, 1.3, 1.4

Tom Schouten doelie at users.sourceforge.net
Fri Sep 1 15:45:32 CEST 2006


Update of /cvsroot/pure-data/externals/pdp/modules/generic
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv2628/modules/generic

Modified Files:
	Makefile pdp_del.c pdp_rawin.c pdp_rawout.c 
Log Message:
pdp current darcs merge

Index: pdp_rawin.c
===================================================================
RCS file: /cvsroot/pure-data/externals/pdp/modules/generic/pdp_rawin.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** pdp_rawin.c	16 Dec 2005 01:05:32 -0000	1.3
--- pdp_rawin.c	1 Sep 2006 13:45:30 -0000	1.4
***************
*** 1,4 ****
  /*
!  *   Pure Data Packet module. packet forth console
   *   Copyright (c) by Tom Schouten <pdp at zzz.kotnet.org>
   *
--- 1,4 ----
  /*
!  *   Pure Data Packet module. Raw packet input
   *   Copyright (c) by Tom Schouten <pdp at zzz.kotnet.org>
   *
***************
*** 20,23 ****
--- 20,24 ----
  
  
+ #include <string.h>
  #include <pthread.h>
  #include <stdio.h>
***************
*** 41,44 ****
--- 42,94 ----
  
  
+ /* TODO: add flow control 
+ 
+ reminder: pthread condition values for synchro
+ 
+ writing to a queue goes like this:
+ * pthread_mutex_lock
+ * atomic op (write)
+ * pthread_cond_signal
+ * pthread_mutex_unlock
+ 
+ reading from a queue goes like this:
+ * pthread_mutex_lock
+ * while (!CONDITION) pthread_cond_wait
+ * atomic op (read)
+ * pthread_mutex_unlock
+ 
+ 
+ in this case, there is a reader and a writer, AND a maximum
+ size of the buffer between them (1 atom) so both reader and writer
+ might block. compare this to the 'command queue' in libpf, where
+ only the reader blocks.
+ 
+ so the course of events is:
+ 
+ READER
+ * wait for data ready (COND_READ) this blocks pd, it is assumed data is always ready in normal operation
+ * consume
+ * signal writer (COND_WRITE)
+ 
+ 
+ WRITER
+ * wait for space ready (COND_WRITE)
+ * write
+ * signal reader
+ 
+ 
+ one remark though: 
+ is this machinery really necessary?
+ -----------------------------------
+ 
+ it might be easier to just read in the pd thread.
+ the only problem this gives is when data really arrives not in a single chunk, but as a stream.
+ to me that seems a really awkward special case which can be solved using another PROCESS to
+ do the buffering / dropping.
+ 
+ so, for now, it's just synchronous, no threads for sync reading.
+ 
+ */
+ 
  
  
***************
*** 55,58 ****
--- 105,109 ----
      /* comm */
      t_pdp_list *x_queue; // packet queue
+     int x_pipefd;
  
      /* thread */
***************
*** 60,68 ****
      pthread_attr_t x_attr;
      pthread_t x_thread;
  
      /* sync */
      int x_giveup;  // 1-> terminate reader thread
      int x_active;  // 1-> reader thread is launched
-     int x_done;    // 1-> reader thread has exited
  
      /* config */
--- 111,120 ----
      pthread_attr_t x_attr;
      pthread_t x_thread;
+     
  
      /* sync */
+     int x_mode;    // 1-> sync to input
      int x_giveup;  // 1-> terminate reader thread
      int x_active;  // 1-> reader thread is launched
  
      /* config */
***************
*** 77,212 ****
  
  static void rawin_close(t_rawin *x);
- static void tick(t_rawin *x)
- {
-     /* send all packets in queue to outlet */
-     lock(x);
-     while (x->x_queue->elements){
- 	outlet_pdp_atom(x->x_outlet, x->x_queue->first);
- 	pdp_list_pop(x->x_queue); // pop stale reference
-     }
-     unlock(x);
-     clock_delay(x->x_clock, PERIOD);
  
-     /* check if thread is done */
-     if (x->x_done) rawin_close(x);
  
- }
  
- static void move_current_to_queue(t_rawin *x, int packet)
- {
-     lock(x);
-     pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet);
-     unlock(x);
- }
  
! static void *rawin_thread(void *y)
! {
!     int pipe;
      int packet = -1;
!     t_rawin *x = (t_rawin *)y;
      int period_sec;
      int period_usec;
  
! 
!     //D pdp_post("pipe: %s", x->x_pipe->s_name);
!     //D pdp_post("type: %s", x->x_type->s_name);
! 
!     /* open pipe */
!     if (-1 == (pipe = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){
! 	perror(x->x_pipe->s_name);
! 	goto exit;
      }
  
!     /* main loop (packets) */
!     while(1){
! 	void *data = 0;
! 	int left = -1;
  
! 	/* create packet */
! 	if (-1 != packet){
! 	    pdp_post("WARNING: deleting stale packet");
  	    pdp_packet_mark_unused(packet);
  	}
! 	packet = pdp_factory_newpacket(x->x_type);
! 	if (-1 == packet){
! 	    pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name);
! 	    goto exit;
  	}
- 	
- 	/* fill packet */
- 	data = pdp_packet_data(packet);
- 	left = pdp_packet_data_size(packet);
- 	// D pdp_post("packet %d, data %x, size %d", packet, data, left);
- 
- 	/* inner loop: pipe reads */
- 	while(left){
- 
- 	    fd_set inset;
- 	    struct timeval tv = {0,10000};
- 
- 	    /* check if we need to stop */
- 	    if (x->x_giveup){
- 		pdp_packet_mark_unused(packet);
- 		goto close;
- 	    }
- 	    /* select, with timeout */
- 	    FD_ZERO(&inset);
- 	    FD_SET(pipe, &inset);
- 	    if (-1 == select(pipe+1, &inset, NULL,NULL, &tv)){
- 		pdp_post("select error");
- 		goto close;
- 	    }
  
! 	    /* if ready, read, else retry */
! 	    if (FD_ISSET(pipe, &inset)){
! 		int bytes = read(pipe, data, left);
! 		if (!bytes){
! 		    /* if no bytes are read, pipe is closed */
! 		    goto close;
! 		}
! 		data += bytes;
! 		left -= bytes;
  	    }
  	}
! 		   
! 	/* move to queue */
! 	move_current_to_queue(x, packet);
! 	packet = -1;
  
  
!     
      }
  
-   close:
-     /* close pipe */
-     close(pipe);
- 	
- 	
-   exit:
-     x->x_done = 1;
      return 0;
  }
  
  
  
! static void rawin_type(t_rawin *x, t_symbol *type)
! {
!     x->x_type = pdp_gensym(type->s_name);
  }
  
- static void rawin_open(t_rawin *x, t_symbol *pipe)
- {
-     /* save pipe name if not empty */
-     if (pipe->s_name[0]) {x->x_pipe = pipe;}
  
!     if (x->x_active) {
! 	pdp_post("already open");
! 	return;
!     }
!     /* start thread */
!     x->x_giveup = 0;
!     x->x_done = 0;
!     pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x);
!     x->x_active = 1;
  }
  
--- 129,251 ----
  
  static void rawin_close(t_rawin *x);
  
  
  
  
! static int read_packet(t_rawin *x){
! 
      int packet = -1;
!     void *data = 0;
!     int left = -1;
      int period_sec;
      int period_usec;
  
!     /* create packet */
!     if (-1 != packet){
!         pdp_post("WARNING: deleting stale packet");
!         pdp_packet_mark_unused(packet);
      }
+     packet = pdp_factory_newpacket(x->x_type);
+     if (-1 == packet){
+         pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name);
+         goto close;
+     }
+     
+     /* fill packet */
+     data = pdp_packet_data(packet);
+     left = pdp_packet_data_size(packet);
+     // D pdp_post("packet %d, data %x, size %d", packet, data, left);
  
!     /* inner loop: pipe reads */
!     while(left){
  
!         fd_set inset;
! 	struct timeval tv = {0,10000};
! 
! 	/* check if we need to stop */
! 	if (x->x_giveup){
  	    pdp_packet_mark_unused(packet);
+ 	    goto close;
  	}
! 	/* select, with timeout */
! 	FD_ZERO(&inset);
! 	FD_SET(x->x_pipefd, &inset);
! 	if (-1 == select(x->x_pipefd+1, &inset, NULL,NULL, &tv)){
! 	    pdp_post("select error");
! 	    goto close;
  	}
  
! 	/* if ready, read, else retry */
! 	if (FD_ISSET(x->x_pipefd, &inset)){
! 	    int bytes = read(x->x_pipefd, data, left);
! 	    if (!bytes){
! 	        /* if no bytes are read, pipe is closed */
! 	        goto close;
  	    }
+ 	    data += bytes;
+ 	    left -= bytes;
  	}
!     }
!     return packet;
!  close:
!     return -1;
! }
  
  
! /* reader thread syncs to pipe */
! static void *rawin_thread(void *y)
! {
!     t_rawin *x = (t_rawin *)y;
!     int packet = -1;
! 
!     /* loop until error or close */
!     while (-1 != (packet = read_packet(x))) {
!       lock(x);
!       pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet);
!       unlock(x);
      }
  
      return 0;
  }
  
+ /* sync to stream:
+    tick polls the receive queue */
+ static void rawin_tick(t_rawin *x)
+ {
+     int p = -1;
+  
+     /* send all packets in queue to outlet */
+     while (x->x_queue->elements){
+         lock(x);
+ 	//pdp_post("%d", x->x_queue->elements);
+         p = pdp_list_pop(x->x_queue).w_packet;
+         unlock(x);
+ 	//pdp_post_n("%d ", p); 
+ 	pdp_packet_pass_if_valid(x->x_outlet, &p);
+ 	//pdp_post("%d",p);
+     }
+     clock_delay(x->x_clock, PERIOD);
  
+ }
  
! /* sync to bang:
!    this runs the reader in the pd thread
!  */
! static void rawin_bang(t_rawin *x){
!   if (!x->x_active) return;
!   if (x->x_mode) return;
!   int packet = read_packet(x);
!   if (-1 == packet) rawin_close(x); // close on error
!   pdp_packet_pass_if_valid(x->x_outlet, &packet);
!   
  }
  
  
! 
! 
! 
! static void rawin_type(t_rawin *x, t_symbol *type)
! {
!     x->x_type = pdp_gensym(type->s_name);
  }
  
***************
*** 218,234 ****
      /* stop thread: set giveup + wait */
      x->x_giveup = 1;
!     pthread_join(x->x_thread, NULL);
      x->x_active = 0;
  
      /* notify */
      outlet_bang(x->x_sync_outlet);
!     pdp_post("connection to %s closed", x->x_pipe->s_name);
! 
      
  
  
      
  }
  
  static void rawin_free(t_rawin *x)
  {
--- 257,306 ----
      /* stop thread: set giveup + wait */
      x->x_giveup = 1;
!     if (x->x_mode) pthread_join(x->x_thread, NULL);
      x->x_active = 0;
  
+     /* close pipe */
+     close(x->x_pipefd);
+ 
      /* notify */
      outlet_bang(x->x_sync_outlet);
!     pdp_post("pdp_rawin: connection to %s closed", x->x_pipe->s_name);
      
+ }
  
  
+ static void rawin_open(t_rawin *x, t_symbol *spipe)
+ {
+     /* save pipe name if not empty */
+     if (spipe->s_name[0]) {x->x_pipe = spipe;}
+ 
+     if (x->x_active) {
+ 	pdp_post("pdp_rawin: already open");
+ 	return;
+     }
+ 
+     /* open pipe */
+     if (-1 == (x->x_pipefd = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){
+ 	perror(x->x_pipe->s_name);
+ 	return;
+     }
+ 
+     /* thread control vars */
+     x->x_giveup = 0;
+     x->x_active = 1;
+ 
+     /* start thread if sync mode */
+     if (x->x_mode) 
+       pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x);
      
  }
  
+ 
+ static void rawin_sync(t_rawin *x, t_float fmode){
+     rawin_close(x);
+     x->x_mode = (int)fmode;
+ }
+ 
+ 
  static void rawin_free(t_rawin *x)
  {
***************
*** 242,250 ****
  
  
! static void *rawin_new(t_symbol *pipe, t_symbol *type)
  {
      t_rawin *x;
  
!     pdp_post("%s %s", pipe->s_name, type->s_name);
  
      /* allocate & init */
--- 314,322 ----
  
  
! static void *rawin_new(t_symbol *spipe, t_symbol *type)
  {
      t_rawin *x;
  
!     pdp_post("%s %s", spipe->s_name, type->s_name);
  
      /* allocate & init */
***************
*** 252,260 ****
      x->x_outlet = outlet_new(&x->x_obj, &s_anything);
      x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
!     x->x_clock = clock_new(x, (t_method)tick);
      x->x_queue = pdp_list_new(0);
      x->x_active = 0;
      x->x_giveup = 0;
!     x->x_done = 0;
      x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
      x->x_pipe = gensym("/tmp/pdpraw"); // default
--- 324,332 ----
      x->x_outlet = outlet_new(&x->x_obj, &s_anything);
      x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
!     x->x_clock = clock_new(x, (t_method)rawin_tick);
      x->x_queue = pdp_list_new(0);
      x->x_active = 0;
      x->x_giveup = 0;
!     x->x_mode = 0;
      x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
      x->x_pipe = gensym("/tmp/pdpraw"); // default
***************
*** 265,269 ****
      /* args */
      rawin_type(x, type);
!     if (pipe->s_name[0]) x->x_pipe = pipe; 
  
      return (void *)x;
--- 337,341 ----
      /* args */
      rawin_type(x, type);
!     if (spipe->s_name[0]) x->x_pipe = spipe; 
  
      return (void *)x;
***************
*** 287,294 ****
     	(t_method)rawin_free, sizeof(t_rawin), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
  
!     /* add global message handler */
      class_addmethod(rawin_class, (t_method)rawin_type, gensym("type"), A_SYMBOL, A_NULL);
      class_addmethod(rawin_class, (t_method)rawin_open, gensym("open"), A_DEFSYMBOL, A_NULL);
      class_addmethod(rawin_class, (t_method)rawin_close, gensym("close"), A_NULL);
  
  
--- 359,368 ----
     	(t_method)rawin_free, sizeof(t_rawin), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
  
!     /* add global message handlers */
      class_addmethod(rawin_class, (t_method)rawin_type, gensym("type"), A_SYMBOL, A_NULL);
      class_addmethod(rawin_class, (t_method)rawin_open, gensym("open"), A_DEFSYMBOL, A_NULL);
      class_addmethod(rawin_class, (t_method)rawin_close, gensym("close"), A_NULL);
+     class_addmethod(rawin_class, (t_method)rawin_sync, gensym("sync"), A_FLOAT, A_NULL);
+     class_addmethod(rawin_class, (t_method)rawin_bang, gensym("bang"), A_NULL);
  
  

Index: Makefile
===================================================================
RCS file: /cvsroot/pure-data/externals/pdp/modules/generic/Makefile,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** Makefile	16 Dec 2005 01:05:32 -0000	1.3
--- Makefile	1 Sep 2006 13:45:30 -0000	1.4
***************
*** 5,9 ****
  PDP_MOD = pdp_reg.o pdp_del.o pdp_snap.o pdp_trigger.o \
  	pdp_route.o pdp_inspect.o pdp_loop.o pdp_description.o pdp_convert.o \
! 	pdp_udp_send.o pdp_udp_receive.o pdp_rawin.o pdp_rawout.o
  
  # build generic modules
--- 5,9 ----
  PDP_MOD = pdp_reg.o pdp_del.o pdp_snap.o pdp_trigger.o \
  	pdp_route.o pdp_inspect.o pdp_loop.o pdp_description.o pdp_convert.o \
! 	pdp_udp_send.o pdp_udp_receive.o pdp_rawin.o pdp_rawout.o pdp_metro.o
  
  # build generic modules

Index: pdp_del.c
===================================================================
RCS file: /cvsroot/pure-data/externals/pdp/modules/generic/pdp_del.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** pdp_del.c	16 Dec 2005 01:05:32 -0000	1.3
--- pdp_del.c	1 Sep 2006 13:45:30 -0000	1.4
***************
*** 72,76 ****
        out = (((x->x_head + x->x_delay)) % x->x_order);
        packet = x->x_packet[out];
!       pdp_packet_pass_if_valid(x->x_outlet0, &x->x_packet[out]);
  
  /*
--- 72,83 ----
        out = (((x->x_head + x->x_delay)) % x->x_order);
        packet = x->x_packet[out];
! 
!       // originally, we wouldn't keep the packet in the delay line to save memory
!       // however, this behaviour is very annoying, and doesn't allow ''scratching''
!       // so we send out a copy instead.
!       // pdp_packet_pass_if_valid(x->x_outlet0, &x->x_packet[out]);
!       int p = pdp_packet_copy_ro(packet);
!       pdp_packet_pass_if_valid(x->x_outlet0, &p);
! 
  
  /*
***************
*** 89,92 ****
--- 96,108 ----
  
        x->x_head = (x->x_head + x->x_order - 1) % x->x_order;
+ 
+ /*
+       int i;
+       for (i=0; i<x->x_order; i++){
+ 	  fprintf(stderr, " %d", x->x_packet[i]);
+       }
+       fprintf(stderr, "\n");
+ */
+ 
      }
  

Index: pdp_rawout.c
===================================================================
RCS file: /cvsroot/pure-data/externals/pdp/modules/generic/pdp_rawout.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** pdp_rawout.c	16 Dec 2005 01:05:32 -0000	1.3
--- pdp_rawout.c	1 Sep 2006 13:45:30 -0000	1.4
***************
*** 1,4 ****
  /*
!  *   Pure Data Packet module. packet forth console
   *   Copyright (c) by Tom Schouten <pdp at zzz.kotnet.org>
   *
--- 1,4 ----
  /*
!  *   Pure Data Packet module. Raw packet output.
   *   Copyright (c) by Tom Schouten <pdp at zzz.kotnet.org>
   *
***************
*** 20,23 ****
--- 20,48 ----
  
  
+ /*
+   This is the most straightforward way to get data out of pdp.
+   The internals follow the simple reader/writer pattern
+   * writer: runs in pd thread, accepts packets from inlet and stores in queue
+   * reader: runs in own thread, reads packets from queue and writes to pipe
+ 
+   Since there is no communication from reader to writer, we need a watchdog
+   timer to check the status of the writer. Mainly to close when necessary.
+ 
+   To enable audio recording, pdp_rawout will also produce interleaved 16bit
+   audio. You will need to instantiate it with [pdp_rawout~ nbchans]
+ 
+ 
+ 
+ */
+ 
+ /* TODO:
+ 
+    make audio buffer smaller (128 bytes writes is too heavy)
+ 
+ */
+ 
+ 
+ 
+ #include <string.h>
  #include <pthread.h>
  #include <stdio.h>
***************
*** 40,47 ****
  
  #define D if (1)
! #define MAX_QUEUESIZE 4
  #define PIPE_BLOCKSIZE 4096
! 
! 
  
  
--- 65,71 ----
  
  #define D if (1)
! #define DEF_QUEUESIZE 100
  #define PIPE_BLOCKSIZE 4096
! #define POLLTIME 20
  
  
***************
*** 52,60 ****
      /* pd */
      t_object x_obj;
      //t_outlet *x_outlet;
      t_outlet *x_sync_outlet;
  
      /* comm */
!     t_pdp_list *x_queue; // packet queue
  
      /* thread */
--- 76,92 ----
      /* pd */
      t_object x_obj;
+     t_float  x_f;
+ 
      //t_outlet *x_outlet;
      t_outlet *x_sync_outlet;
  
      /* comm */
!     t_pdp_list *x_queue;   // packet queue
!     t_clock    *x_clock;   // watchdog timer
!     t_float     x_deltime; // watchdog period
!     int         x_verbose;
!     int         x_packet;    // dsp fillup state
!     int         x_countdown; // amount of packet filled up
!     short int * x_needle;    // start writing here
  
      /* thread */
***************
*** 69,74 ****
  
      /* config */
!     t_symbol *x_pipe;
!     t_pdp_symbol *x_type;
  
  } t_rawout;
--- 101,108 ----
  
      /* config */
!     t_symbol      *x_pipe;
!     int           x_chans;            // nb audio channels
!     t_pdp_symbol *x_tmptype;
!     int           x_max_queuesize;    // buffer size ( < 0 = infty )
  
  } t_rawout;
***************
*** 79,131 ****
  
  static void rawout_close(t_rawout *x);
- static void pdp_in(t_rawout *x, t_symbol *s, t_float f)
- {
-     /* save packet to pdp queue, if size is smaller than maxsize */
-     if (s == S_REGISTER_RO){
- 	if (x->x_queue->elements < MAX_QUEUESIZE){
- 	    int p = (int)f;
- 	    p = pdp_packet_copy_ro(p);
- 	    if (p != -1){
- 		lock(x);
- 		pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p);
- 		unlock(x);
- 	    }
- 	}
- 	else {
- 	    pdp_post("pdp_rawout: dropping packet: (queue full)", MAX_QUEUESIZE);
- 	}
- 	    
-     }
- 
-     /* check if thread is done */
-     if (x->x_done) rawout_close(x);
- 
- }
- 
  
  
  static void *rawout_thread(void *y)
  {
!     int pipe;
      int packet = -1;
      t_rawout *x = (t_rawout *)y;
      int period_sec;
      int period_usec;
!     sigset_t sigvec; /* signal handling  */
  
      /* ignore pipe signal */
!     sigemptyset(&sigvec);
!     sigaddset(&sigvec,SIGPIPE);
!     pthread_sigmask(SIG_BLOCK, &sigvec, 0);
  
      //D pdp_post("pipe: %s", x->x_pipe->s_name);
      //D pdp_post("type: %s", x->x_type->s_name);
  
      /* open pipe */
!     if (-1 == (pipe = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK))){
! 	perror(x->x_pipe->s_name);
! 	goto exit;
      }
  
      /* main loop (packets) */
      while(1){
--- 113,151 ----
  
  static void rawout_close(t_rawout *x);
  
+ /* READER THREAD: reads from queue, writes to pipe */
  
  static void *rawout_thread(void *y)
  {
!     int pipefd;
      int packet = -1;
      t_rawout *x = (t_rawout *)y;
      int period_sec;
      int period_usec;
!     sigset_t _sigvec; /* signal handling  */
! 
!     char me[1024];
!     sprintf(me, "pdp_rawout: %s", x->x_pipe->s_name);
  
      /* ignore pipe signal */
!     sigemptyset(&_sigvec);
!     sigaddset(&_sigvec,SIGPIPE);
!     pthread_sigmask(SIG_BLOCK, &_sigvec, 0);
  
      //D pdp_post("pipe: %s", x->x_pipe->s_name);
      //D pdp_post("type: %s", x->x_type->s_name);
  
+ 
      /* open pipe */
!     if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK|O_APPEND))){
! 	if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_CREAT))){
! 	    perror(me);
! 	    goto exit;
! 	}
      }
  
+     pdp_post("pdp_rawout: opened %s", x->x_pipe->s_name);
+ 
+ 
      /* main loop (packets) */
      while(1){
***************
*** 148,155 ****
  	packet = pdp_list_pop(x->x_queue).w_packet;
  	unlock(x);
! 	
  	/* send packet */
  	data = pdp_packet_data(packet);
  	left = pdp_packet_data_size(packet);
  
  	/* inner loop: pipe reads */
--- 168,180 ----
  	packet = pdp_list_pop(x->x_queue).w_packet;
  	unlock(x);
! 
  	/* send packet */
+ 	//t_pdp *h = pdp_packet_header(packet);
+ 	//fprintf(stderr, "users %d\n", h->users);
+ 	
  	data = pdp_packet_data(packet);
  	left = pdp_packet_data_size(packet);
+ 	//int i; for (i = 0; i<left/2; i++) fprintf(stderr, "%06x ", ((short int *)data)[i]); fprintf(stderr, "\n");
+ 
  
  	/* inner loop: pipe reads */
***************
*** 167,172 ****
  	    /* select, with timeout */
  	    FD_ZERO(&outset);
! 	    FD_SET(pipe, &outset);
! 	    if (-1 == select(pipe+1, NULL, &outset, NULL, &tv)){
  		pdp_post("select error");
  		goto close;
--- 192,197 ----
  	    /* select, with timeout */
  	    FD_ZERO(&outset);
! 	    FD_SET(pipefd, &outset);
! 	    if (-1 == select(pipefd+1, NULL, &outset, NULL, &tv)){
  		pdp_post("select error");
  		goto close;
***************
*** 174,183 ****
  
  	    /* if ready, read, else retry */
! 	    if (FD_ISSET(pipe, &outset)){
! 		int bytes = write(pipe, data, left);
  		/* handle errors */
  		if (bytes <= 0){
! 		    perror(x->x_pipe->s_name);
! 		    if (bytes != EAGAIN) goto close;
  		}
  		/* or update pointers */
--- 199,209 ----
  
  	    /* if ready, read, else retry */
! 	    if (FD_ISSET(pipefd, &outset)){
! 
! 		int bytes = write(pipefd, data, left);
  		/* handle errors */
  		if (bytes <= 0){
! 		    perror(me);
! 		    if (errno != EAGAIN) goto close;
  		}
  		/* or update pointers */
***************
*** 201,205 ****
    close:
      /* close pipe */
!     close(pipe);
  	
  	
--- 227,231 ----
    close:
      /* close pipe */
!     close(pipefd);
  	
  	
***************
*** 210,223 ****
  
  
  
! static void rawout_type(t_rawout *x, t_symbol *type)
  {
!     x->x_type = pdp_gensym(type->s_name);
  }
  
! static void rawout_open(t_rawout *x, t_symbol *pipe)
  {
      /* save pipe name if not empty */
!     if (pipe->s_name[0]) {x->x_pipe = pipe;}
  
      if (x->x_active) {
--- 236,354 ----
  
  
+ /* DSP INPUT */
+ #define DSP_ARG(type, name, source) type name = (type)source
  
! static t_int *rawout_perform(t_int *w);
! static void rawout_dsp(t_rawout *x, t_signal **sp){
!     int nargs = 2 + x->x_chans;
!     t_int args[nargs];
!     args[0] = (int)x;
!     args[1] = (int)sp[0]->s_n;
!     float **in = (float **)(args+2);
!     int i;
!     for (i=0; i<x->x_chans; i++) in[i] = sp[i]->s_vec;
!     dsp_addv(rawout_perform, nargs, args);
! }
! 
! static t_int *rawout_perform(t_int *w)
  {
!     DSP_ARG(t_rawout*,  x,     w[1]);
!     DSP_ARG(t_int,      n,     w[2]);
!     DSP_ARG(t_float**,  in,    &w[3]);
! 
!     short int *out;
!     int i,c,k;
!     
! 
!     if (x->x_queue->elements >= x->x_max_queuesize){
! 	// drop
! 	if (x->x_verbose && x->x_active) pdp_post_n(".");
!     }
!     else {
! 
! 	// create packet
! 	if (x->x_countdown) {
! 	    out = x->x_needle;
! 	}
! 	else {
! 	    int p = pdp_factory_newpacket(x->x_tmptype);
! 	    pdp_packet_mark_unused(x->x_packet);
! 	    // if (-1 == p) pdp_post("pdp_rawout~: can't create packet");
! 	    x->x_needle = out = (short int *)pdp_packet_data(p);
! 	    x->x_packet = p;
! 	    x->x_countdown = pdp_packet_data_size(p) / 2;
! 	}
! 
! 	//pdp_post("data size = %d bytes", pdp_packet_data_size(p));
! 
! 	//memset(out, 0, pdp_packet_data_size(p));
! 
! 
! 	// convert & interleave
! 	for (k=0,i=0; i<n; i++){
! 	    for (c=0; c<x->x_chans; c++,k++){
! 		float val = (in[c])[i];
! 		val *= (float)((1<<15)-1);
! 		out[k] = (short int)(val);
! 		//out[k] = 0x1234;
! 		//fprintf(stderr, "(%d,%d,%d) %d\n", c, i, k, (int)out[k]);
! 	    }
! 	}
! 
! 	x->x_needle += k;
! 	x->x_countdown -= k;
! 	if (!x->x_countdown){
! 	    // transfer
! 	    lock(x);
! 	    pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)x->x_packet);
! 	    x->x_packet = -1;
! 	    unlock(x);
! 	}
!     }
! 
!     return w+3+x->x_chans;
  }
  
! 
! /* PACKET INPUT */
! 
! 
! static void pdp_in(t_rawout *x, t_symbol *s, t_float f)
! {
!     /* save packet to pdp queue, if size is smaller than maxsize */
!     if (s == S_REGISTER_RO){
! 	if (x->x_queue->elements < x->x_max_queuesize){
! 	    int p = (int)f;
! 	    p = pdp_packet_copy_ro(p);
! 	    if (p != -1){
! 		lock(x);
! 		pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p);
! 		unlock(x);
! 	    }
! 	}
! 	else {
! 	    //pdp_post("pdp_rawout: dropping packet: (queue full)", x->x_max_queuesize);
! 	    if (x->x_active && x->x_verbose) pdp_post_n(".");
! 	}
! 	    
!     }
! 
!     /* check if thread is done */
!     if (x->x_done) rawout_close(x);
! 
! }
! 
! /* CONTROL */
! 
! //static void rawout_type(t_rawout *x, t_symbol *type)
! //{
!     //x->x_type = pdp_gensym(type->s_name);
! //}
! 
! static void clear_queue(t_rawout *x);
! static void rawout_open(t_rawout *x, t_symbol *spipe)
  {
      /* save pipe name if not empty */
!     if (spipe->s_name[0]) {x->x_pipe = spipe;}
  
      if (x->x_active) {
***************
*** 228,238 ****
      x->x_giveup = 0;
      x->x_done = 0;
      pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x);
      x->x_active = 1;
  }
  
! static void rawout_close(t_rawout *x)
! {
! 
      if (!x->x_active) return;
  
--- 359,368 ----
      x->x_giveup = 0;
      x->x_done = 0;
+     clear_queue(x);
      pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x);
      x->x_active = 1;
  }
  
! static void rawout_stopthread(t_rawout *x){
      if (!x->x_active) return;
  
***************
*** 241,273 ****
      pthread_join(x->x_thread, NULL);
      x->x_active = 0;
  
      /* notify */
      outlet_bang(x->x_sync_outlet);
!     pdp_post("connection to %s closed", x->x_pipe->s_name);
! 
      
  
  
!     
  }
  
  static void rawout_free(t_rawout *x)
  {
!     rawout_close(x);
!     pdp_tree_strip_packets(x->x_queue);
      pdp_tree_free(x->x_queue);
  }
  
  t_class *rawout_class;
  
! 
! static void *rawout_new(t_symbol *pipe, t_symbol *type)
! {
!     t_rawout *x;
! 
!     pdp_post("%s %s", pipe->s_name, type->s_name);
! 
!     /* allocate & init */
!     x = (t_rawout *)pd_new(rawout_class);
      //x->x_outlet = outlet_new(&x->x_obj, &s_anything);
      x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
--- 371,428 ----
      pthread_join(x->x_thread, NULL);
      x->x_active = 0;
+     clear_queue(x);
+ }
+ 
+ static void rawout_close(t_rawout *x)
+ {
+     if (!x->x_active) return;
+     rawout_stopthread(x);
  
      /* notify */
      outlet_bang(x->x_sync_outlet);
!     pdp_post("pdp_rawout: closed %s", x->x_pipe->s_name);
      
+ }
  
+ static void rawout_verbose(t_rawout *x, t_float fverbose){
+     x->x_verbose = (int)fverbose;
+ }
+ static void rawout_tick(t_rawout *x){
+     if (x->x_done) rawout_close(x);
+     clock_delay(x->x_clock, x->x_deltime);
+ }
  
! static void clear_queue(t_rawout *x){
!     lock(x);
!     while(x->x_queue->elements){
! 	pdp_packet_mark_unused(pdp_list_pop(x->x_queue).w_packet);
!     }
!     unlock(x);
! }
! 
! #define MAXINT (0x80 << ((sizeof(int)-1)*8))
! static void rawout_queuesize(t_rawout *x, t_float fbufsize){
!     int bufsize = (int)fbufsize;
!     if (fbufsize < 1) fbufsize = MAXINT;
!     x->x_max_queuesize = bufsize;
  }
  
  static void rawout_free(t_rawout *x)
  {
!     // terminate thread
!     rawout_stopthread(x);
!     
!     // cleanup
!     clock_unset(x->x_clock);
!     clear_queue(x);
      pdp_tree_free(x->x_queue);
+     pdp_packet_mark_unused(x->x_packet);
  }
  
  t_class *rawout_class;
+ t_class *rawout_dsp_class;
  
! // shared stuff
! static void rawout_init(t_rawout *x, t_symbol *spipe){
      //x->x_outlet = outlet_new(&x->x_obj, &s_anything);
      x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
***************
*** 276,292 ****
      x->x_giveup = 0;
      x->x_done = 0;
!     x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
      x->x_pipe = gensym("/tmp/pdpraw"); // default
      pthread_attr_init(&x->x_attr);
      pthread_mutex_init(&x->x_mut, NULL);
  
      /* args */
!     rawout_type(x, type);
!     if (pipe->s_name[0]) x->x_pipe = pipe; 
  
      return (void *)x;
  
  }
  
  
  
--- 431,489 ----
      x->x_giveup = 0;
      x->x_done = 0;
!     x->x_packet = -1;
!     x->x_countdown = 0;
!     x->x_max_queuesize = DEF_QUEUESIZE;
!     //x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
      x->x_pipe = gensym("/tmp/pdpraw"); // default
+     x->x_deltime = POLLTIME;
+     x->x_clock = clock_new(x, (t_method)rawout_tick);
      pthread_attr_init(&x->x_attr);
      pthread_mutex_init(&x->x_mut, NULL);
  
      /* args */
!     //rawout_type(x, type);
!     if (spipe && spipe->s_name[0]) x->x_pipe = spipe; 
!     rawout_tick(x);
!     rawout_verbose(x,0);
! }
  
+ // [pdp_rawout]
+ static void *rawout_new(t_symbol *spipe /* , t_symbol *type */)
+ {
+     t_rawout *x;
+     /* allocate & init */
+     x = (t_rawout *)pd_new(rawout_class);
+     rawout_init(x, spipe);
      return (void *)x;
  
  }
  
+ // [pdp_rawout~]
+ 
+ 
+ // HUH??? why do i get the symbol first, then the float????
+ // see http://lists.puredata.info/pipermail/pd-dev/2003-09/001618.html
+ //static void *rawout_dsp_new(t_float fchans, t_symbol *spipe){
+ static void *rawout_dsp_new(t_symbol *spipe, t_float fchans){
+     int chans = (int)fchans;
+     if (chans < 1)  chans = 1;
+     if (chans > 64) return 0; // this is just a safety measure
+ 
+     t_rawout *x = (t_rawout *)pd_new(rawout_dsp_class);
+     rawout_init(x, spipe);
+ 
+     // hack: temp packet
+     char temp_packet[1024];
+     sprintf(temp_packet, "image/grey/256x%d", 8 * chans);
+     pdp_post("pdp_rawout: using fake packet %s", temp_packet);
+     x->x_tmptype = pdp_gensym(temp_packet);
+ 
+     // create audio inlets
+     x->x_chans = chans;
+     while (--chans) inlet_new(&x->x_obj, &x->x_obj.ob_pd, gensym("signal"), gensym("signal"));
+     
+     return (void *)x;
+ }
+ 
  
  
***************
*** 297,319 ****
  
  
! void pdp_rawout_setup(void)
! {
      int i;
  
      /* create a standard pd class: [pdp_rawout pipe type] */
      rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new,
!    	(t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
  
      /* add global message handler */
!     class_addmethod(rawout_class, (t_method)pdp_in, 
! 		    gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL);
  
!     class_addmethod(rawout_class, (t_method)rawout_type, gensym("type"), A_SYMBOL, A_NULL);
!     class_addmethod(rawout_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);
!     class_addmethod(rawout_class, (t_method)rawout_close, gensym("close"), A_NULL);
  
  
  }
  
  #ifdef __cplusplus
  }
--- 494,537 ----
  
  
! 
! #define COMMON(base_class)\
!     class_addmethod(base_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);\
!     class_addmethod(base_class, (t_method)rawout_close, gensym("close"), A_NULL);\
!     class_addmethod(base_class, (t_method)rawout_verbose, gensym("verbose"), A_FLOAT, A_NULL);\
!     class_addmethod(base_class, (t_method)rawout_queuesize, gensym("bufsize"), A_FLOAT, A_NULL);
! 
! void pdp_rawout_setup(void){
! 
      int i;
  
+     /* PACKETS */
+ 
      /* create a standard pd class: [pdp_rawout pipe type] */
      rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new,
!    	(t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_NULL);
  
      /* add global message handler */
!     class_addmethod(rawout_class, (t_method)pdp_in, gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL);
!     COMMON(rawout_class);
  
!     /* DSP */
  
+     /* create a standard pd class: [pdp_rawout pipe type] */
+     rawout_dsp_class = class_new(gensym("pdp_rawout~"), (t_newmethod)rawout_dsp_new,
+    	(t_method)rawout_free, sizeof(t_rawout), 0, A_DEFFLOAT, A_DEFSYMBOL, A_NULL);
+ 
+     /* add signal input */
+     CLASS_MAINSIGNALIN(rawout_dsp_class, t_rawout, x_f); 
+     class_addmethod(rawout_dsp_class, (t_method)rawout_dsp, gensym("dsp"), 0);   
+     COMMON(rawout_dsp_class);
  
  }
  
+     
+ 
+ 
+ 
+ 
+ 
  #ifdef __cplusplus
  }





More information about the Pd-cvs mailing list