[PD-cvs] externals/pdp/modules/generic Makefile, 1.3, 1.4 pdp_del.c, 1.3, 1.4 pdp_rawin.c, 1.3, 1.4 pdp_rawout.c, 1.3, 1.4
Tom Schouten
doelie at users.sourceforge.net
Fri Sep 1 15:45:32 CEST 2006
Update of /cvsroot/pure-data/externals/pdp/modules/generic
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv2628/modules/generic
Modified Files:
Makefile pdp_del.c pdp_rawin.c pdp_rawout.c
Log Message:
pdp current darcs merge
Index: pdp_rawin.c
===================================================================
RCS file: /cvsroot/pure-data/externals/pdp/modules/generic/pdp_rawin.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** pdp_rawin.c 16 Dec 2005 01:05:32 -0000 1.3
--- pdp_rawin.c 1 Sep 2006 13:45:30 -0000 1.4
***************
*** 1,4 ****
/*
! * Pure Data Packet module. packet forth console
* Copyright (c) by Tom Schouten <pdp at zzz.kotnet.org>
*
--- 1,4 ----
/*
! * Pure Data Packet module. Raw packet input
* Copyright (c) by Tom Schouten <pdp at zzz.kotnet.org>
*
***************
*** 20,23 ****
--- 20,24 ----
+ #include <string.h>
#include <pthread.h>
#include <stdio.h>
***************
*** 41,44 ****
--- 42,94 ----
+ /* TODO: add flow control
+
+ reminder: pthread condition values for synchro
+
+ writing to a queue goes like this:
+ * pthread_mutex_lock
+ * atomic op (write)
+ * pthread_cond_signal
+ * pthread_mutex_unlock
+
+ reading from a queue goes like this:
+ * pthread_mutex_lock
+ * while (!CONDITION) pthread_cond_wait
+ * atomic op (read)
+ * pthread_mutex_unlock
+
+
+ in this case, there is a reader and a writer, AND a maximum
+ size of the buffer between them (1 atom) so both reader and writer
+ might block. compare this to the 'command queue' in libpf, where
+ only the reader blocks.
+
+ so the course of events is:
+
+ READER
+ * wait for data ready (COND_READ) this blocks pd, it is assumed data is always ready in normal operation
+ * consume
+ * signal writer (COND_WRITE)
+
+
+ WRITER
+ * wait for space ready (COND_WRITE)
+ * write
+ * signal reader
+
+
+ one remark though:
+ is this machinery really necessary?
+ -----------------------------------
+
+ it might be easier to just read in the pd thread.
+ the only problem this gives is when data really arrives not in a single chunk, but as a stream.
+ to me that seems a really awkward special case which can be solved using another PROCESS to
+ do the buffering / dropping.
+
+ so, for now, it's just synchronous, no threads for sync reading.
+
+ */
+
***************
*** 55,58 ****
--- 105,109 ----
/* comm */
t_pdp_list *x_queue; // packet queue
+ int x_pipefd;
/* thread */
***************
*** 60,68 ****
pthread_attr_t x_attr;
pthread_t x_thread;
/* sync */
int x_giveup; // 1-> terminate reader thread
int x_active; // 1-> reader thread is launched
- int x_done; // 1-> reader thread has exited
/* config */
--- 111,120 ----
pthread_attr_t x_attr;
pthread_t x_thread;
+
/* sync */
+ int x_mode; // 1-> sync to input
int x_giveup; // 1-> terminate reader thread
int x_active; // 1-> reader thread is launched
/* config */
***************
*** 77,212 ****
static void rawin_close(t_rawin *x);
- static void tick(t_rawin *x)
- {
- /* send all packets in queue to outlet */
- lock(x);
- while (x->x_queue->elements){
- outlet_pdp_atom(x->x_outlet, x->x_queue->first);
- pdp_list_pop(x->x_queue); // pop stale reference
- }
- unlock(x);
- clock_delay(x->x_clock, PERIOD);
- /* check if thread is done */
- if (x->x_done) rawin_close(x);
- }
- static void move_current_to_queue(t_rawin *x, int packet)
- {
- lock(x);
- pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet);
- unlock(x);
- }
! static void *rawin_thread(void *y)
! {
! int pipe;
int packet = -1;
! t_rawin *x = (t_rawin *)y;
int period_sec;
int period_usec;
!
! //D pdp_post("pipe: %s", x->x_pipe->s_name);
! //D pdp_post("type: %s", x->x_type->s_name);
!
! /* open pipe */
! if (-1 == (pipe = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){
! perror(x->x_pipe->s_name);
! goto exit;
}
! /* main loop (packets) */
! while(1){
! void *data = 0;
! int left = -1;
! /* create packet */
! if (-1 != packet){
! pdp_post("WARNING: deleting stale packet");
pdp_packet_mark_unused(packet);
}
! packet = pdp_factory_newpacket(x->x_type);
! if (-1 == packet){
! pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name);
! goto exit;
}
-
- /* fill packet */
- data = pdp_packet_data(packet);
- left = pdp_packet_data_size(packet);
- // D pdp_post("packet %d, data %x, size %d", packet, data, left);
-
- /* inner loop: pipe reads */
- while(left){
-
- fd_set inset;
- struct timeval tv = {0,10000};
-
- /* check if we need to stop */
- if (x->x_giveup){
- pdp_packet_mark_unused(packet);
- goto close;
- }
- /* select, with timeout */
- FD_ZERO(&inset);
- FD_SET(pipe, &inset);
- if (-1 == select(pipe+1, &inset, NULL,NULL, &tv)){
- pdp_post("select error");
- goto close;
- }
! /* if ready, read, else retry */
! if (FD_ISSET(pipe, &inset)){
! int bytes = read(pipe, data, left);
! if (!bytes){
! /* if no bytes are read, pipe is closed */
! goto close;
! }
! data += bytes;
! left -= bytes;
}
}
!
! /* move to queue */
! move_current_to_queue(x, packet);
! packet = -1;
!
}
- close:
- /* close pipe */
- close(pipe);
-
-
- exit:
- x->x_done = 1;
return 0;
}
! static void rawin_type(t_rawin *x, t_symbol *type)
! {
! x->x_type = pdp_gensym(type->s_name);
}
- static void rawin_open(t_rawin *x, t_symbol *pipe)
- {
- /* save pipe name if not empty */
- if (pipe->s_name[0]) {x->x_pipe = pipe;}
! if (x->x_active) {
! pdp_post("already open");
! return;
! }
! /* start thread */
! x->x_giveup = 0;
! x->x_done = 0;
! pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x);
! x->x_active = 1;
}
--- 129,251 ----
static void rawin_close(t_rawin *x);
! static int read_packet(t_rawin *x){
!
int packet = -1;
! void *data = 0;
! int left = -1;
int period_sec;
int period_usec;
! /* create packet */
! if (-1 != packet){
! pdp_post("WARNING: deleting stale packet");
! pdp_packet_mark_unused(packet);
}
+ packet = pdp_factory_newpacket(x->x_type);
+ if (-1 == packet){
+ pdp_post("ERROR: can't create packet. type = %s", x->x_type->s_name);
+ goto close;
+ }
+
+ /* fill packet */
+ data = pdp_packet_data(packet);
+ left = pdp_packet_data_size(packet);
+ // D pdp_post("packet %d, data %x, size %d", packet, data, left);
! /* inner loop: pipe reads */
! while(left){
! fd_set inset;
! struct timeval tv = {0,10000};
!
! /* check if we need to stop */
! if (x->x_giveup){
pdp_packet_mark_unused(packet);
+ goto close;
}
! /* select, with timeout */
! FD_ZERO(&inset);
! FD_SET(x->x_pipefd, &inset);
! if (-1 == select(x->x_pipefd+1, &inset, NULL,NULL, &tv)){
! pdp_post("select error");
! goto close;
}
! /* if ready, read, else retry */
! if (FD_ISSET(x->x_pipefd, &inset)){
! int bytes = read(x->x_pipefd, data, left);
! if (!bytes){
! /* if no bytes are read, pipe is closed */
! goto close;
}
+ data += bytes;
+ left -= bytes;
}
! }
! return packet;
! close:
! return -1;
! }
! /* reader thread syncs to pipe */
! static void *rawin_thread(void *y)
! {
! t_rawin *x = (t_rawin *)y;
! int packet = -1;
!
! /* loop until error or close */
! while (-1 != (packet = read_packet(x))) {
! lock(x);
! pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)packet);
! unlock(x);
}
return 0;
}
+ /* sync to stream:
+ tick polls the receive queue */
+ static void rawin_tick(t_rawin *x)
+ {
+ int p = -1;
+
+ /* send all packets in queue to outlet */
+ while (x->x_queue->elements){
+ lock(x);
+ //pdp_post("%d", x->x_queue->elements);
+ p = pdp_list_pop(x->x_queue).w_packet;
+ unlock(x);
+ //pdp_post_n("%d ", p);
+ pdp_packet_pass_if_valid(x->x_outlet, &p);
+ //pdp_post("%d",p);
+ }
+ clock_delay(x->x_clock, PERIOD);
+ }
! /* sync to bang:
! this runs the reader in the pd thread
! */
! static void rawin_bang(t_rawin *x){
! if (!x->x_active) return;
! if (x->x_mode) return;
! int packet = read_packet(x);
! if (-1 == packet) rawin_close(x); // close on error
! pdp_packet_pass_if_valid(x->x_outlet, &packet);
!
}
!
!
!
! static void rawin_type(t_rawin *x, t_symbol *type)
! {
! x->x_type = pdp_gensym(type->s_name);
}
***************
*** 218,234 ****
/* stop thread: set giveup + wait */
x->x_giveup = 1;
! pthread_join(x->x_thread, NULL);
x->x_active = 0;
/* notify */
outlet_bang(x->x_sync_outlet);
! pdp_post("connection to %s closed", x->x_pipe->s_name);
!
}
static void rawin_free(t_rawin *x)
{
--- 257,306 ----
/* stop thread: set giveup + wait */
x->x_giveup = 1;
! if (x->x_mode) pthread_join(x->x_thread, NULL);
x->x_active = 0;
+ /* close pipe */
+ close(x->x_pipefd);
+
/* notify */
outlet_bang(x->x_sync_outlet);
! pdp_post("pdp_rawin: connection to %s closed", x->x_pipe->s_name);
+ }
+ static void rawin_open(t_rawin *x, t_symbol *spipe)
+ {
+ /* save pipe name if not empty */
+ if (spipe->s_name[0]) {x->x_pipe = spipe;}
+
+ if (x->x_active) {
+ pdp_post("pdp_rawin: already open");
+ return;
+ }
+
+ /* open pipe */
+ if (-1 == (x->x_pipefd = open(x->x_pipe->s_name, O_RDONLY|O_NONBLOCK))){
+ perror(x->x_pipe->s_name);
+ return;
+ }
+
+ /* thread control vars */
+ x->x_giveup = 0;
+ x->x_active = 1;
+
+ /* start thread if sync mode */
+ if (x->x_mode)
+ pthread_create(&x->x_thread, &x->x_attr, rawin_thread , x);
}
+
+ static void rawin_sync(t_rawin *x, t_float fmode){
+ rawin_close(x);
+ x->x_mode = (int)fmode;
+ }
+
+
static void rawin_free(t_rawin *x)
{
***************
*** 242,250 ****
! static void *rawin_new(t_symbol *pipe, t_symbol *type)
{
t_rawin *x;
! pdp_post("%s %s", pipe->s_name, type->s_name);
/* allocate & init */
--- 314,322 ----
! static void *rawin_new(t_symbol *spipe, t_symbol *type)
{
t_rawin *x;
! pdp_post("%s %s", spipe->s_name, type->s_name);
/* allocate & init */
***************
*** 252,260 ****
x->x_outlet = outlet_new(&x->x_obj, &s_anything);
x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
! x->x_clock = clock_new(x, (t_method)tick);
x->x_queue = pdp_list_new(0);
x->x_active = 0;
x->x_giveup = 0;
! x->x_done = 0;
x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
x->x_pipe = gensym("/tmp/pdpraw"); // default
--- 324,332 ----
x->x_outlet = outlet_new(&x->x_obj, &s_anything);
x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
! x->x_clock = clock_new(x, (t_method)rawin_tick);
x->x_queue = pdp_list_new(0);
x->x_active = 0;
x->x_giveup = 0;
! x->x_mode = 0;
x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
x->x_pipe = gensym("/tmp/pdpraw"); // default
***************
*** 265,269 ****
/* args */
rawin_type(x, type);
! if (pipe->s_name[0]) x->x_pipe = pipe;
return (void *)x;
--- 337,341 ----
/* args */
rawin_type(x, type);
! if (spipe->s_name[0]) x->x_pipe = spipe;
return (void *)x;
***************
*** 287,294 ****
(t_method)rawin_free, sizeof(t_rawin), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
! /* add global message handler */
class_addmethod(rawin_class, (t_method)rawin_type, gensym("type"), A_SYMBOL, A_NULL);
class_addmethod(rawin_class, (t_method)rawin_open, gensym("open"), A_DEFSYMBOL, A_NULL);
class_addmethod(rawin_class, (t_method)rawin_close, gensym("close"), A_NULL);
--- 359,368 ----
(t_method)rawin_free, sizeof(t_rawin), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
! /* add global message handlers */
class_addmethod(rawin_class, (t_method)rawin_type, gensym("type"), A_SYMBOL, A_NULL);
class_addmethod(rawin_class, (t_method)rawin_open, gensym("open"), A_DEFSYMBOL, A_NULL);
class_addmethod(rawin_class, (t_method)rawin_close, gensym("close"), A_NULL);
+ class_addmethod(rawin_class, (t_method)rawin_sync, gensym("sync"), A_FLOAT, A_NULL);
+ class_addmethod(rawin_class, (t_method)rawin_bang, gensym("bang"), A_NULL);
Index: Makefile
===================================================================
RCS file: /cvsroot/pure-data/externals/pdp/modules/generic/Makefile,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** Makefile 16 Dec 2005 01:05:32 -0000 1.3
--- Makefile 1 Sep 2006 13:45:30 -0000 1.4
***************
*** 5,9 ****
PDP_MOD = pdp_reg.o pdp_del.o pdp_snap.o pdp_trigger.o \
pdp_route.o pdp_inspect.o pdp_loop.o pdp_description.o pdp_convert.o \
! pdp_udp_send.o pdp_udp_receive.o pdp_rawin.o pdp_rawout.o
# build generic modules
--- 5,9 ----
PDP_MOD = pdp_reg.o pdp_del.o pdp_snap.o pdp_trigger.o \
pdp_route.o pdp_inspect.o pdp_loop.o pdp_description.o pdp_convert.o \
! pdp_udp_send.o pdp_udp_receive.o pdp_rawin.o pdp_rawout.o pdp_metro.o
# build generic modules
Index: pdp_del.c
===================================================================
RCS file: /cvsroot/pure-data/externals/pdp/modules/generic/pdp_del.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** pdp_del.c 16 Dec 2005 01:05:32 -0000 1.3
--- pdp_del.c 1 Sep 2006 13:45:30 -0000 1.4
***************
*** 72,76 ****
out = (((x->x_head + x->x_delay)) % x->x_order);
packet = x->x_packet[out];
! pdp_packet_pass_if_valid(x->x_outlet0, &x->x_packet[out]);
/*
--- 72,83 ----
out = (((x->x_head + x->x_delay)) % x->x_order);
packet = x->x_packet[out];
!
! // originally, we wouldn't keep the packet in the delay line to save memory
! // however, this behaviour is very annoying, and doesn't allow ''scratching''
! // so we send out a copy instead.
! // pdp_packet_pass_if_valid(x->x_outlet0, &x->x_packet[out]);
! int p = pdp_packet_copy_ro(packet);
! pdp_packet_pass_if_valid(x->x_outlet0, &p);
!
/*
***************
*** 89,92 ****
--- 96,108 ----
x->x_head = (x->x_head + x->x_order - 1) % x->x_order;
+
+ /*
+ int i;
+ for (i=0; i<x->x_order; i++){
+ fprintf(stderr, " %d", x->x_packet[i]);
+ }
+ fprintf(stderr, "\n");
+ */
+
}
Index: pdp_rawout.c
===================================================================
RCS file: /cvsroot/pure-data/externals/pdp/modules/generic/pdp_rawout.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -C2 -d -r1.3 -r1.4
*** pdp_rawout.c 16 Dec 2005 01:05:32 -0000 1.3
--- pdp_rawout.c 1 Sep 2006 13:45:30 -0000 1.4
***************
*** 1,4 ****
/*
! * Pure Data Packet module. packet forth console
* Copyright (c) by Tom Schouten <pdp at zzz.kotnet.org>
*
--- 1,4 ----
/*
! * Pure Data Packet module. Raw packet output.
* Copyright (c) by Tom Schouten <pdp at zzz.kotnet.org>
*
***************
*** 20,23 ****
--- 20,48 ----
+ /*
+ This is the most straightforward way to get data out of pdp.
+ The internals follow the simple reader/writer pattern
+ * writer: runs in pd thread, accepts packets from inlet and stores in queue
+ * reader: runs in own thread, reads packets from queue and writes to pipe
+
+ Since there is no communication from reader to writer, we need a watchdog
+ timer to check the status of the writer. Mainly to close when necessary.
+
+ To enable audio recording, pdp_rawout will also produce interleaved 16bit
+ audio. You will need to instantiate it with [pdp_rawout~ nbchans]
+
+
+
+ */
+
+ /* TODO:
+
+ make audio buffer smaller (128 bytes writes is too heavy)
+
+ */
+
+
+
+ #include <string.h>
#include <pthread.h>
#include <stdio.h>
***************
*** 40,47 ****
#define D if (1)
! #define MAX_QUEUESIZE 4
#define PIPE_BLOCKSIZE 4096
!
!
--- 65,71 ----
#define D if (1)
! #define DEF_QUEUESIZE 100
#define PIPE_BLOCKSIZE 4096
! #define POLLTIME 20
***************
*** 52,60 ****
/* pd */
t_object x_obj;
//t_outlet *x_outlet;
t_outlet *x_sync_outlet;
/* comm */
! t_pdp_list *x_queue; // packet queue
/* thread */
--- 76,92 ----
/* pd */
t_object x_obj;
+ t_float x_f;
+
//t_outlet *x_outlet;
t_outlet *x_sync_outlet;
/* comm */
! t_pdp_list *x_queue; // packet queue
! t_clock *x_clock; // watchdog timer
! t_float x_deltime; // watchdog period
! int x_verbose;
! int x_packet; // dsp fillup state
! int x_countdown; // amount of packet filled up
! short int * x_needle; // start writing here
/* thread */
***************
*** 69,74 ****
/* config */
! t_symbol *x_pipe;
! t_pdp_symbol *x_type;
} t_rawout;
--- 101,108 ----
/* config */
! t_symbol *x_pipe;
! int x_chans; // nb audio channels
! t_pdp_symbol *x_tmptype;
! int x_max_queuesize; // buffer size ( < 0 = infty )
} t_rawout;
***************
*** 79,131 ****
static void rawout_close(t_rawout *x);
- static void pdp_in(t_rawout *x, t_symbol *s, t_float f)
- {
- /* save packet to pdp queue, if size is smaller than maxsize */
- if (s == S_REGISTER_RO){
- if (x->x_queue->elements < MAX_QUEUESIZE){
- int p = (int)f;
- p = pdp_packet_copy_ro(p);
- if (p != -1){
- lock(x);
- pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p);
- unlock(x);
- }
- }
- else {
- pdp_post("pdp_rawout: dropping packet: (queue full)", MAX_QUEUESIZE);
- }
-
- }
-
- /* check if thread is done */
- if (x->x_done) rawout_close(x);
-
- }
-
static void *rawout_thread(void *y)
{
! int pipe;
int packet = -1;
t_rawout *x = (t_rawout *)y;
int period_sec;
int period_usec;
! sigset_t sigvec; /* signal handling */
/* ignore pipe signal */
! sigemptyset(&sigvec);
! sigaddset(&sigvec,SIGPIPE);
! pthread_sigmask(SIG_BLOCK, &sigvec, 0);
//D pdp_post("pipe: %s", x->x_pipe->s_name);
//D pdp_post("type: %s", x->x_type->s_name);
/* open pipe */
! if (-1 == (pipe = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK))){
! perror(x->x_pipe->s_name);
! goto exit;
}
/* main loop (packets) */
while(1){
--- 113,151 ----
static void rawout_close(t_rawout *x);
+ /* READER THREAD: reads from queue, writes to pipe */
static void *rawout_thread(void *y)
{
! int pipefd;
int packet = -1;
t_rawout *x = (t_rawout *)y;
int period_sec;
int period_usec;
! sigset_t _sigvec; /* signal handling */
!
! char me[1024];
! sprintf(me, "pdp_rawout: %s", x->x_pipe->s_name);
/* ignore pipe signal */
! sigemptyset(&_sigvec);
! sigaddset(&_sigvec,SIGPIPE);
! pthread_sigmask(SIG_BLOCK, &_sigvec, 0);
//D pdp_post("pipe: %s", x->x_pipe->s_name);
//D pdp_post("type: %s", x->x_type->s_name);
+
/* open pipe */
! if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_NONBLOCK|O_APPEND))){
! if (-1 == (pipefd = open(x->x_pipe->s_name, O_WRONLY|O_CREAT))){
! perror(me);
! goto exit;
! }
}
+ pdp_post("pdp_rawout: opened %s", x->x_pipe->s_name);
+
+
/* main loop (packets) */
while(1){
***************
*** 148,155 ****
packet = pdp_list_pop(x->x_queue).w_packet;
unlock(x);
!
/* send packet */
data = pdp_packet_data(packet);
left = pdp_packet_data_size(packet);
/* inner loop: pipe reads */
--- 168,180 ----
packet = pdp_list_pop(x->x_queue).w_packet;
unlock(x);
!
/* send packet */
+ //t_pdp *h = pdp_packet_header(packet);
+ //fprintf(stderr, "users %d\n", h->users);
+
data = pdp_packet_data(packet);
left = pdp_packet_data_size(packet);
+ //int i; for (i = 0; i<left/2; i++) fprintf(stderr, "%06x ", ((short int *)data)[i]); fprintf(stderr, "\n");
+
/* inner loop: pipe reads */
***************
*** 167,172 ****
/* select, with timeout */
FD_ZERO(&outset);
! FD_SET(pipe, &outset);
! if (-1 == select(pipe+1, NULL, &outset, NULL, &tv)){
pdp_post("select error");
goto close;
--- 192,197 ----
/* select, with timeout */
FD_ZERO(&outset);
! FD_SET(pipefd, &outset);
! if (-1 == select(pipefd+1, NULL, &outset, NULL, &tv)){
pdp_post("select error");
goto close;
***************
*** 174,183 ****
/* if ready, read, else retry */
! if (FD_ISSET(pipe, &outset)){
! int bytes = write(pipe, data, left);
/* handle errors */
if (bytes <= 0){
! perror(x->x_pipe->s_name);
! if (bytes != EAGAIN) goto close;
}
/* or update pointers */
--- 199,209 ----
/* if ready, read, else retry */
! if (FD_ISSET(pipefd, &outset)){
!
! int bytes = write(pipefd, data, left);
/* handle errors */
if (bytes <= 0){
! perror(me);
! if (errno != EAGAIN) goto close;
}
/* or update pointers */
***************
*** 201,205 ****
close:
/* close pipe */
! close(pipe);
--- 227,231 ----
close:
/* close pipe */
! close(pipefd);
***************
*** 210,223 ****
! static void rawout_type(t_rawout *x, t_symbol *type)
{
! x->x_type = pdp_gensym(type->s_name);
}
! static void rawout_open(t_rawout *x, t_symbol *pipe)
{
/* save pipe name if not empty */
! if (pipe->s_name[0]) {x->x_pipe = pipe;}
if (x->x_active) {
--- 236,354 ----
+ /* DSP INPUT */
+ #define DSP_ARG(type, name, source) type name = (type)source
! static t_int *rawout_perform(t_int *w);
! static void rawout_dsp(t_rawout *x, t_signal **sp){
! int nargs = 2 + x->x_chans;
! t_int args[nargs];
! args[0] = (int)x;
! args[1] = (int)sp[0]->s_n;
! float **in = (float **)(args+2);
! int i;
! for (i=0; i<x->x_chans; i++) in[i] = sp[i]->s_vec;
! dsp_addv(rawout_perform, nargs, args);
! }
!
! static t_int *rawout_perform(t_int *w)
{
! DSP_ARG(t_rawout*, x, w[1]);
! DSP_ARG(t_int, n, w[2]);
! DSP_ARG(t_float**, in, &w[3]);
!
! short int *out;
! int i,c,k;
!
!
! if (x->x_queue->elements >= x->x_max_queuesize){
! // drop
! if (x->x_verbose && x->x_active) pdp_post_n(".");
! }
! else {
!
! // create packet
! if (x->x_countdown) {
! out = x->x_needle;
! }
! else {
! int p = pdp_factory_newpacket(x->x_tmptype);
! pdp_packet_mark_unused(x->x_packet);
! // if (-1 == p) pdp_post("pdp_rawout~: can't create packet");
! x->x_needle = out = (short int *)pdp_packet_data(p);
! x->x_packet = p;
! x->x_countdown = pdp_packet_data_size(p) / 2;
! }
!
! //pdp_post("data size = %d bytes", pdp_packet_data_size(p));
!
! //memset(out, 0, pdp_packet_data_size(p));
!
!
! // convert & interleave
! for (k=0,i=0; i<n; i++){
! for (c=0; c<x->x_chans; c++,k++){
! float val = (in[c])[i];
! val *= (float)((1<<15)-1);
! out[k] = (short int)(val);
! //out[k] = 0x1234;
! //fprintf(stderr, "(%d,%d,%d) %d\n", c, i, k, (int)out[k]);
! }
! }
!
! x->x_needle += k;
! x->x_countdown -= k;
! if (!x->x_countdown){
! // transfer
! lock(x);
! pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)x->x_packet);
! x->x_packet = -1;
! unlock(x);
! }
! }
!
! return w+3+x->x_chans;
}
!
! /* PACKET INPUT */
!
!
! static void pdp_in(t_rawout *x, t_symbol *s, t_float f)
! {
! /* save packet to pdp queue, if size is smaller than maxsize */
! if (s == S_REGISTER_RO){
! if (x->x_queue->elements < x->x_max_queuesize){
! int p = (int)f;
! p = pdp_packet_copy_ro(p);
! if (p != -1){
! lock(x);
! pdp_list_add_back(x->x_queue, a_packet, (t_pdp_word)p);
! unlock(x);
! }
! }
! else {
! //pdp_post("pdp_rawout: dropping packet: (queue full)", x->x_max_queuesize);
! if (x->x_active && x->x_verbose) pdp_post_n(".");
! }
!
! }
!
! /* check if thread is done */
! if (x->x_done) rawout_close(x);
!
! }
!
! /* CONTROL */
!
! //static void rawout_type(t_rawout *x, t_symbol *type)
! //{
! //x->x_type = pdp_gensym(type->s_name);
! //}
!
! static void clear_queue(t_rawout *x);
! static void rawout_open(t_rawout *x, t_symbol *spipe)
{
/* save pipe name if not empty */
! if (spipe->s_name[0]) {x->x_pipe = spipe;}
if (x->x_active) {
***************
*** 228,238 ****
x->x_giveup = 0;
x->x_done = 0;
pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x);
x->x_active = 1;
}
! static void rawout_close(t_rawout *x)
! {
!
if (!x->x_active) return;
--- 359,368 ----
x->x_giveup = 0;
x->x_done = 0;
+ clear_queue(x);
pthread_create(&x->x_thread, &x->x_attr, rawout_thread , x);
x->x_active = 1;
}
! static void rawout_stopthread(t_rawout *x){
if (!x->x_active) return;
***************
*** 241,273 ****
pthread_join(x->x_thread, NULL);
x->x_active = 0;
/* notify */
outlet_bang(x->x_sync_outlet);
! pdp_post("connection to %s closed", x->x_pipe->s_name);
!
!
}
static void rawout_free(t_rawout *x)
{
! rawout_close(x);
! pdp_tree_strip_packets(x->x_queue);
pdp_tree_free(x->x_queue);
}
t_class *rawout_class;
!
! static void *rawout_new(t_symbol *pipe, t_symbol *type)
! {
! t_rawout *x;
!
! pdp_post("%s %s", pipe->s_name, type->s_name);
!
! /* allocate & init */
! x = (t_rawout *)pd_new(rawout_class);
//x->x_outlet = outlet_new(&x->x_obj, &s_anything);
x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
--- 371,428 ----
pthread_join(x->x_thread, NULL);
x->x_active = 0;
+ clear_queue(x);
+ }
+
+ static void rawout_close(t_rawout *x)
+ {
+ if (!x->x_active) return;
+ rawout_stopthread(x);
/* notify */
outlet_bang(x->x_sync_outlet);
! pdp_post("pdp_rawout: closed %s", x->x_pipe->s_name);
+ }
+ static void rawout_verbose(t_rawout *x, t_float fverbose){
+ x->x_verbose = (int)fverbose;
+ }
+ static void rawout_tick(t_rawout *x){
+ if (x->x_done) rawout_close(x);
+ clock_delay(x->x_clock, x->x_deltime);
+ }
! static void clear_queue(t_rawout *x){
! lock(x);
! while(x->x_queue->elements){
! pdp_packet_mark_unused(pdp_list_pop(x->x_queue).w_packet);
! }
! unlock(x);
! }
!
! #define MAXINT (0x80 << ((sizeof(int)-1)*8))
! static void rawout_queuesize(t_rawout *x, t_float fbufsize){
! int bufsize = (int)fbufsize;
! if (fbufsize < 1) fbufsize = MAXINT;
! x->x_max_queuesize = bufsize;
}
static void rawout_free(t_rawout *x)
{
! // terminate thread
! rawout_stopthread(x);
!
! // cleanup
! clock_unset(x->x_clock);
! clear_queue(x);
pdp_tree_free(x->x_queue);
+ pdp_packet_mark_unused(x->x_packet);
}
t_class *rawout_class;
+ t_class *rawout_dsp_class;
! // shared stuff
! static void rawout_init(t_rawout *x, t_symbol *spipe){
//x->x_outlet = outlet_new(&x->x_obj, &s_anything);
x->x_sync_outlet = outlet_new(&x->x_obj, &s_anything);
***************
*** 276,292 ****
x->x_giveup = 0;
x->x_done = 0;
! x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
x->x_pipe = gensym("/tmp/pdpraw"); // default
pthread_attr_init(&x->x_attr);
pthread_mutex_init(&x->x_mut, NULL);
/* args */
! rawout_type(x, type);
! if (pipe->s_name[0]) x->x_pipe = pipe;
return (void *)x;
}
--- 431,489 ----
x->x_giveup = 0;
x->x_done = 0;
! x->x_packet = -1;
! x->x_countdown = 0;
! x->x_max_queuesize = DEF_QUEUESIZE;
! //x->x_type = pdp_gensym("image/YCrCb/320x240"); //default
x->x_pipe = gensym("/tmp/pdpraw"); // default
+ x->x_deltime = POLLTIME;
+ x->x_clock = clock_new(x, (t_method)rawout_tick);
pthread_attr_init(&x->x_attr);
pthread_mutex_init(&x->x_mut, NULL);
/* args */
! //rawout_type(x, type);
! if (spipe && spipe->s_name[0]) x->x_pipe = spipe;
! rawout_tick(x);
! rawout_verbose(x,0);
! }
+ // [pdp_rawout]
+ static void *rawout_new(t_symbol *spipe /* , t_symbol *type */)
+ {
+ t_rawout *x;
+ /* allocate & init */
+ x = (t_rawout *)pd_new(rawout_class);
+ rawout_init(x, spipe);
return (void *)x;
}
+ // [pdp_rawout~]
+
+
+ // HUH??? why do i get the symbol first, then the float????
+ // see http://lists.puredata.info/pipermail/pd-dev/2003-09/001618.html
+ //static void *rawout_dsp_new(t_float fchans, t_symbol *spipe){
+ static void *rawout_dsp_new(t_symbol *spipe, t_float fchans){
+ int chans = (int)fchans;
+ if (chans < 1) chans = 1;
+ if (chans > 64) return 0; // this is just a safety measure
+
+ t_rawout *x = (t_rawout *)pd_new(rawout_dsp_class);
+ rawout_init(x, spipe);
+
+ // hack: temp packet
+ char temp_packet[1024];
+ sprintf(temp_packet, "image/grey/256x%d", 8 * chans);
+ pdp_post("pdp_rawout: using fake packet %s", temp_packet);
+ x->x_tmptype = pdp_gensym(temp_packet);
+
+ // create audio inlets
+ x->x_chans = chans;
+ while (--chans) inlet_new(&x->x_obj, &x->x_obj.ob_pd, gensym("signal"), gensym("signal"));
+
+ return (void *)x;
+ }
+
***************
*** 297,319 ****
! void pdp_rawout_setup(void)
! {
int i;
/* create a standard pd class: [pdp_rawout pipe type] */
rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new,
! (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_DEFSYMBOL, A_NULL);
/* add global message handler */
! class_addmethod(rawout_class, (t_method)pdp_in,
! gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL);
! class_addmethod(rawout_class, (t_method)rawout_type, gensym("type"), A_SYMBOL, A_NULL);
! class_addmethod(rawout_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);
! class_addmethod(rawout_class, (t_method)rawout_close, gensym("close"), A_NULL);
}
#ifdef __cplusplus
}
--- 494,537 ----
!
! #define COMMON(base_class)\
! class_addmethod(base_class, (t_method)rawout_open, gensym("open"), A_DEFSYMBOL, A_NULL);\
! class_addmethod(base_class, (t_method)rawout_close, gensym("close"), A_NULL);\
! class_addmethod(base_class, (t_method)rawout_verbose, gensym("verbose"), A_FLOAT, A_NULL);\
! class_addmethod(base_class, (t_method)rawout_queuesize, gensym("bufsize"), A_FLOAT, A_NULL);
!
! void pdp_rawout_setup(void){
!
int i;
+ /* PACKETS */
+
/* create a standard pd class: [pdp_rawout pipe type] */
rawout_class = class_new(gensym("pdp_rawout"), (t_newmethod)rawout_new,
! (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFSYMBOL, A_NULL);
/* add global message handler */
! class_addmethod(rawout_class, (t_method)pdp_in, gensym("pdp"), A_SYMBOL, A_FLOAT, A_NULL);
! COMMON(rawout_class);
! /* DSP */
+ /* create a standard pd class: [pdp_rawout pipe type] */
+ rawout_dsp_class = class_new(gensym("pdp_rawout~"), (t_newmethod)rawout_dsp_new,
+ (t_method)rawout_free, sizeof(t_rawout), 0, A_DEFFLOAT, A_DEFSYMBOL, A_NULL);
+
+ /* add signal input */
+ CLASS_MAINSIGNALIN(rawout_dsp_class, t_rawout, x_f);
+ class_addmethod(rawout_dsp_class, (t_method)rawout_dsp, gensym("dsp"), 0);
+ COMMON(rawout_dsp_class);
}
+
+
+
+
+
+
#ifdef __cplusplus
}
More information about the Pd-cvs
mailing list