[PD-cvs] externals/pdp/system/net Makefile, 1.2, 1.3 pdp_net.c, 1.2, 1.3

Hans-Christoph Steiner eighthave at users.sourceforge.net
Fri Dec 16 02:05:42 CET 2005


Update of /cvsroot/pure-data/externals/pdp/system/net
In directory sc8-pr-cvs1.sourceforge.net:/tmp/cvs-serv6756/system/net

Added Files:
	Makefile pdp_net.c 
Log Message:
checking in pdp 0.12.4 from http://zwizwa.fartit.com/pd/pdp/pdp-0.12.4.tar.gz

--- NEW FILE: Makefile ---

OBJECTS = pdp_net.o


include ../../Makefile.config

all: $(OBJECTS)

clean:
	rm -f *~
	rm -f *.o

--- NEW FILE: pdp_net.c ---

#include "pdp_net.h"
#include "pdp_debug.h"
#include "pdp_post.h"
#include "pdp_mem.h"

#define D if (0)   // DEBUG MSG
#define DD if (0)  // DROP DEBUG MSG

/* shared internals */

static int _is_udp_header(t_pdp_udp_header *header, unsigned int size)
{
    if (size < sizeof(t_pdp_udp_header)) return 0;
    if (strcmp(header->signature, "PDP")) return 0;
    if (PDP_UDP_VERSION != header->version) return 0;
    return 1;
}

static void _make_udp_header(t_pdp_udp_header *header)
{
    strcpy(header->signature, "PDP");
    header->version = PDP_UDP_VERSION;
}




/* R E C E I V E R */


/* INTERNALS */

static void _send_packet(t_pdp_udp_receiver *x)
{
    _make_udp_header(&x->x_resend_header);
    PDP_ASSERT(x->x_resend_udp_packet_size <= sizeof(t_pdp_udp_header) + sizeof(x->x_resend_chunks));

    /* send the packet */
    if (-1 == sendto (x->x_socket, &x->x_resend_header, x->x_resend_udp_packet_size, 0, 
		      (struct sockaddr *)&x->x_source_socket, x->x_sslen)){
	pdp_post("pdp_netreceive: send failed");
    }
}

static void _send_ack_new(t_pdp_udp_receiver *x)
{
    /* setup resend header */
    x->x_resend_header.connection_id = x->x_connection_id;
    x->x_resend_header.sequence_number = PDP_UDP_ACKNEW;
    x->x_resend_udp_packet_size = sizeof(t_pdp_udp_header);

    _send_packet(x);
        
}


static int _handle_PDP_UDP_NEW(t_pdp_udp_receiver *x)
{
    /* we've got a PDP_UDP_NEW packet, so prepare to receive the data */
    t_pdp_udp_newpacket *np = (t_pdp_udp_newpacket *)x->x_buf;


    //pdp_post("conn_id    = %x", x->x_header.connection_id);
    //pdp_post("size       = %d", np->data_size);
    //pdp_post("nb_chunks  = %d", np->nb_chunks);
    //pdp_post("chunk_size = %d", np->chunk_size);
    //pdp_post("type       = %s", np->type);

    /* check if it is a resend of the PDP_UDP_NEW packet (if NEW_ACK didn't get through)
       if not, prepare for reception */

    if (x->x_connection_id != x->x_header.connection_id){


	/* prepare for reception : TODO add some more checks here */

	// setup type info
	if (x->x_data_type) pdp_dealloc (x->x_data_type);
	x->x_data_type = pdp_alloc(1 + strlen(np->type));
	strcpy(x->x_data_type, np->type);

	// setup data buffer
	x->x_data_size = np->data_size;
	if (x->x_data) pdp_dealloc (x->x_data);
	x->x_data = pdp_alloc(x->x_data_size);
	memset(x->x_data, 0, x->x_data_size); // clear for debug
	
	// setup connection info
	x->x_connection_id = x->x_header.connection_id;
	x->x_nb_chunks     = np->nb_chunks;
	x->x_chunk_size    = np->chunk_size;

	/* setup chunk list */
	if (x->x_chunk_list) pdp_dealloc(x->x_chunk_list);
	x->x_chunk_list = pdp_alloc(sizeof(unsigned int)*x->x_nb_chunks);
	memset(x->x_chunk_list, 0, sizeof(unsigned int)*x->x_nb_chunks);

	x->x_receive_finished = 0;   // we're in a receiving state
	x->x_packet_transferred = 0; // we didn't pass the packet yet
    }
	
    /* send ACK */
    _send_ack_new(x);
    


    return 1;
}

static void _handle_PDP_UDP_DONE(t_pdp_udp_receiver *x)
{
    unsigned int chunk;
    unsigned int missing;
    unsigned int i;
    unsigned int resend_packet_size;


    /* check the connection id */
    if (x->x_connection_id != x->x_header.connection_id) return;

    /* determine how many packets are missing */
    missing = 0;
    for (i=0; i<x->x_nb_chunks; i++) 
	if (!x->x_chunk_list[i]) missing++;

    D pdp_post ("last packet %x had %d/%d dropped chunks", x->x_connection_id, missing, x->x_nb_chunks);


    /* build the resend request (chunk list )*/
    if (missing > RESEND_MAX_CHUNKS) missing = RESEND_MAX_CHUNKS;
    chunk = 0;
    i = missing;
    while(i--){
	while (x->x_chunk_list[chunk]) chunk++; // find next missing chunk
	x->x_resend_chunks[i] = chunk++;        // store it in list
    } 
	       
    /* set the packet size to include the list */
    x->x_resend_udp_packet_size = sizeof(t_pdp_udp_header) 
	+ missing * sizeof(unsigned int);
    
    /* setup resend header */
    strcpy((char *)&x->x_resend_header, "PDP");
    x->x_resend_header.version = PDP_UDP_VERSION;
    x->x_resend_header.connection_id = x->x_connection_id;
    x->x_resend_header.sequence_number = PDP_UDP_RESEND;
    
    D pdp_post("pdp_netreceive: sending RESEND response for %u chunks",  missing);

    /* send out */
    _send_packet(x);

    /* indicate we're done if there's no chunks missing */
    if (!missing) x->x_receive_finished = 1;

}


static int _handle_UDP_DATA(t_pdp_udp_receiver *x)
{
    unsigned int seq = x->x_header.sequence_number;
    unsigned int offset = x->x_chunk_size * seq;

    /* ignore the packet if we're not expecting it */
    if ((!x->x_connection_id) || (x->x_connection_id != x->x_header.connection_id)){
	//pdp_post("pdp_netreceive: got invalid data packet: transmission id %x is not part of current transmisson %x",
	//     x->x_header.connection_id, x->x_connection_id); 
	return 0;
    }

    /* check if it is valid */
    if (seq >= x->x_nb_chunks){
	pdp_post("pdp_netreceive: got invalid data packet: sequence number %u out of bound (nb_chunks=%u)",
	     seq, x->x_nb_chunks);
	return 0;
    }

    /* final check */
    PDP_ASSERT(offset + x->x_buf_size <= x->x_data_size);

    /* write & log it */
    memcpy(x->x_data + offset, x->x_buf, x->x_buf_size);
    x->x_chunk_list[seq] = 1;
    return 1;

}

/* INTERFACE */

/* setup */
t_pdp_udp_receiver *pdp_udp_receiver_new(int port)
{
    t_pdp_udp_receiver *x = pdp_alloc(sizeof(*x));
    memset(x, 0, sizeof(*x));

    /* init */
    x->x_data = 0;
    x->x_data_type = 0;
    x->x_data_size = 0;
    x->x_chunk_list = 0;
    x->x_receive_finished = 0;
    x->x_packet_transferred = 0;
    x->x_zero_terminator = 0;

    x->x_socket = socket(PF_INET, SOCK_DGRAM, 0);
    x->x_connection_id = 0; /* zero for bootstrap (0 == an invalid id) */
    x->x_sslen = sizeof(struct sockaddr_in);

    /* bind socket */
    x->x_sa.sin_port = htons(port);
    x->x_sa.sin_addr.s_addr = 0;
    if (-1 != bind (x->x_socket, (struct sockaddr *)&x->x_sa, 
		    sizeof(struct sockaddr_in))) return x;
    
    /* suicide if find failed */
    else {
	pdp_dealloc(x);
	return 0;
    }
}
void pdp_udp_receiver_free(t_pdp_udp_receiver *x)
{
    if (!x) return;
    if (x->x_socket != 1) close (x->x_socket);
    if (x->x_data) pdp_dealloc(x->x_data);
    if (x->x_data_type) pdp_dealloc (x->x_data_type);
    if (x->x_chunk_list) pdp_dealloc (x->x_chunk_list);
}

void pdp_udp_receiver_reset(t_pdp_udp_receiver *x)
{
    x->x_connection_id = 0;
}


/* receive loop, returns 1 on success, -1 on error, 0 on timeout */
int pdp_udp_receiver_receive(t_pdp_udp_receiver *x, unsigned int timeout_ms)
{
    /* listen for packets */

    unsigned int size;
    struct timeval tv = {0,1000 * timeout_ms};
    fd_set inset;
    FD_ZERO(&inset);
    FD_SET(x->x_socket, &inset);
    switch(select (x->x_socket+1, &inset, NULL, NULL, &tv)){
	case -1:
	    return -1; /* select error */
	case 0:
	    return 0;  /* select time out */
	default:
	    break;     /* data ready */
    }

    /* this won't block, since there's data available */
    if (-1 == (int)(size = recvfrom(x->x_socket, (void *)&x->x_header, 
				    PDP_UDP_BUFSIZE+sizeof(x->x_header), 0,
				    (struct sockaddr *)&x->x_source_socket, &x->x_sslen))) return -1;

    /* store the data size of the packet */
    x->x_buf_size = size - sizeof(t_pdp_udp_header);
    
    /* parse the udp packet */
    if (_is_udp_header(&x->x_header, size)){

	/* it is a control packet */
	if ((int)x->x_header.sequence_number < 0){
			
	    switch (x->x_header.sequence_number){
	    case PDP_UDP_NEW:
		_handle_PDP_UDP_NEW(x);
		break;
		
	    case PDP_UDP_DONE:
		_handle_PDP_UDP_DONE(x);

		/* check if we got a complete packet
		   and signal arrival if we haven't done this already */
		if (x->x_receive_finished && !x->x_packet_transferred){
		    x->x_packet_transferred = 1;
		    return 1; // data complete, please receive
		}
		break;
		
	    default:
		pdp_post("got unknown msg");
		break;
	    }
	}
	
	/* it is a data packet */
	else {
	    _handle_UDP_DATA(x);
	}


    }
    
    else {
	pdp_post("pdp_netreceive: got invalid UDP packet (size = %d)", size);
    }

    return 0; //no major event, please poll again
	
}

/* get meta & data */
char *pdp_udp_receiver_type(t_pdp_udp_receiver *x){return x->x_data_type;}
unsigned int pdp_udp_receiver_size(t_pdp_udp_receiver *x){return x->x_data_size;}
void *pdp_udp_receiver_data(t_pdp_udp_receiver *x){return x->x_data;}


/* S E N D E R */

/* INTERNALS */

static void _sleep(t_pdp_udp_sender *x)
{
    int sleep_period = x->x_sleep_period;
    
    if (sleep_period) {
	if (!x->x_sleep_count++) usleep(x->x_sleepgrain_us);
	x->x_sleep_count %= sleep_period;
    }
}

static void _send(t_pdp_udp_sender *x)
{
    //post("sending %u data bytes", x->x_buf_size);

    _make_udp_header(&x->x_header);

    PDP_ASSERT (x->x_buf_size <= PDP_UDP_BUFSIZE);

    if (-1 == sendto (x->x_socket, &x->x_header, x->x_buf_size + sizeof(t_pdp_udp_header),
		      0, (struct sockaddr *)&x->x_sa, sizeof(struct sockaddr_in)))
       pdp_post("pdp_netsend: send FAILED");

    _sleep(x);

}


static void _prepare_for_new_transmission(t_pdp_udp_sender *x, char *type, unsigned int size, void *data)
{
    unsigned int i;

    /* setup data for transmission */
    x->x_data_type = type;
    x->x_data_size = size;
    x->x_data = data;
    x->x_chunk_size = x->x_udp_payload_size;
    x->x_nb_chunks = (x->x_data_size - 1) / x->x_chunk_size + 1;

    /* generate a connection id (non-zero) */
    while (!(x->x_connection_id = rand()));

    /* setup chunk list to contain all chunks */
    if (x->x_chunk_list) free (x->x_chunk_list);
    x->x_chunk_list_size = x->x_nb_chunks;
    x->x_chunk_list = malloc(sizeof(unsigned int)*x->x_chunk_list_size);
    for (i=0; i<x->x_chunk_list_size; i++) x->x_chunk_list[i] = i;
    
}

static void _send_header_packet(t_pdp_udp_sender *x)
{
    t_pdp_udp_newpacket *np = (t_pdp_udp_newpacket *)x->x_buf; /* buf contains the PDP_UDP_NEW body */

    /* init packet */
    x->x_header.sequence_number = PDP_UDP_NEW;
    x->x_header.connection_id = x->x_connection_id;
    np->data_size  = x->x_data_size;
    np->nb_chunks  = x->x_nb_chunks;
    np->chunk_size = x->x_chunk_size;
    strcpy(np->type, x->x_data_type);
    x->x_buf_size = sizeof(*np) + strlen(np->type) + 1;
    PDP_ASSERT(x->x_buf_size <= PDP_UDP_BUFSIZE);

    /* send the packet */
    _send(x);
}

/* saend the chunks in the chunk list */
static void _send_chunks(t_pdp_udp_sender *x){
    unsigned int i;
    unsigned int count = 0;

    /* send chunks: this requires header is setup ok (sig,ver,connid)*/
    for (i=0; i<x->x_chunk_list_size; i++){
	unsigned int offset;
	unsigned int current_chunk_size;
	unsigned int seq = x->x_chunk_list[i];

	PDP_ASSERT(seq < x->x_nb_chunks);
	x->x_header.sequence_number = seq; // store chunk number

	/* get current chunk offset */
	offset = seq * x->x_chunk_size;
	PDP_ASSERT(offset < x->x_data_size);


	/* get current chunk size */
	current_chunk_size = (offset + x->x_chunk_size > x->x_data_size) ? 
	    (x->x_data_size - offset) : x->x_chunk_size;
	x->x_buf_size = current_chunk_size;
	PDP_ASSERT(x->x_buf_size <= PDP_UDP_BUFSIZE);

	/* copy chunk to transmission buffer & send */
	PDP_ASSERT(offset + current_chunk_size <= x->x_data_size);
	memcpy(x->x_buf, x->x_data + offset, current_chunk_size);


	/* send the chunk */
	_send(x);
	count++;
	    
    }
    D pdp_post("sent %d chunks, id=%x", count,x->x_connection_id);
}

/* send a DONE packet */
static void _send_done(t_pdp_udp_sender *x){
    x->x_header.sequence_number = PDP_UDP_DONE;
    x->x_buf_size = 0;
    _send(x);
}
static int _receive_packet(t_pdp_udp_sender *x, int desired_type) 
/* 0 == timeout, -1 == error, 1 == got packet */
{
    unsigned int size;
    int type;

    struct timeval tv;
    fd_set inset;
    int sr;


    while (1){
	int retval;

	/* wait for incoming */
	tv.tv_sec = 0;
	tv.tv_usec = x->x_timeout_us;
	FD_ZERO(&inset);
	FD_SET(x->x_socket, &inset);
	switch (select (x->x_socket+1, &inset, NULL, NULL, &tv)){
	case -1:
	    return -1; /* select error */
	case 0:
	    return 0;  /* select time out */
	default:
	    break;     /* data ready */
	}

	/* read packet */
	if (-1 == (int)(size = recv(x->x_socket, (void *)&x->x_resend_header, MAX_UDP_PACKET, 0))){
	    pdp_post("pdp_netsend: error while reading from socket");
	    return -1;
	}

	/* check if it is a valid PDP_UDP packet */
	if (!_is_udp_header(&x->x_resend_header, size)){
	    pdp_post("pdp_netsend: ignoring invalid UDP packet (size = %u)", size);
	    continue;
	}


	/* check connection id */
	if (x->x_connection_id != x->x_resend_header.connection_id){
	    D pdp_post("pdp_netsend: ignoring ghost packet id=%x, current id=%x",
		 x->x_resend_header.connection_id, x->x_connection_id);
	    continue;
	}

	/* check type */
	type = x->x_resend_header.sequence_number;
	if (type != desired_type) continue;


	/* setup data buffer for known packets */
	switch(type){
	case PDP_UDP_RESEND:
	    x->x_resend_items = (size - sizeof(t_pdp_udp_header)) / sizeof(unsigned int);
	    break;
	default:
	    break;
	}

	return 1;
    }

}

/* get the resend list */
static int _need_resend(t_pdp_udp_sender *x) {

    int retries = 3;
    int retval;
    while (retries--){

	/* send a DONE msg */
	_send_done(x);

	/* wait for ACK */
	switch(_receive_packet(x, PDP_UDP_RESEND)){
	case 0:
	    /* timeout, retry */
	    continue; 
	case -1:
	    /* error */
	    goto move_on;

	default:
	    /* got PDP_UDP_RESEND packet: setup resend list */
	    if (x->x_resend_items  > x->x_nb_chunks){
		pdp_post("pdp_netsend: WARNING: chunk list size (%d) is too big, ignoring RESEND request",
		     x->x_resend_items);
		x->x_resend_items = 0;
		continue;
	    }
	    x->x_chunk_list_size = x->x_resend_items;

	    memcpy(x->x_chunk_list, x->x_resend_chunks, sizeof(unsigned int) * x->x_resend_items);
	    D pdp_post("got RESEND request for %d chunks (id %x)", x->x_resend_items,x->x_connection_id);

	    return x->x_chunk_list_size > 0;
	}
	
    }
    
    /* timeout */
 move_on:
    x->x_chunk_list_size = 0;
    return 0;


}


/* INTERFACE */


/* some flow control hacks */

void pdp_udp_sender_timeout_us(t_pdp_udp_sender *x, unsigned int timeout_us)
{
    x->x_timeout_us = timeout_us;
}


void pdp_udp_sender_sleepgrain_us(t_pdp_udp_sender *x, unsigned int sleepgrain_us)
{
    x->x_sleepgrain_us = sleepgrain_us;
}

void pdp_udp_sender_sleepperiod(t_pdp_udp_sender *x, unsigned int sleepperiod)
{
    x->x_sleep_period = sleepperiod;
}


void pdp_udp_sender_udp_packet_size(t_pdp_udp_sender *x, unsigned int udp_packet_size)
{
    int i = (int)udp_packet_size - sizeof(t_pdp_udp_header);
    if (i < 1024) i = 1024;
    if (i > PDP_UDP_BUFSIZE) i = PDP_UDP_BUFSIZE;
    x->x_udp_payload_size = i;
}

void pdp_udp_sender_connect(t_pdp_udp_sender *x, char *host, unsigned int port)
{
    struct hostent *hp;

    hp = gethostbyname(host);
    if (!hp){
	pdp_post("pdp_udp_sender: host %s not found", host);
    }
    else{
	/* host ok, setup address */
	x->x_sa.sin_family = AF_INET;
	x->x_sa.sin_port = htons(port);
	memcpy((char *)&x->x_sa.sin_addr, (char *)hp->h_addr, hp->h_length);

	/* create the a socket if necessary */
	if (x->x_socket == -1){
	    if (-1 == (x->x_socket = socket(PF_INET, SOCK_DGRAM, 0))){
		pdp_post("pdp_udp_sender: can't create socket");
	    }
	    if (1){
		int on = 1;
		if (setsockopt(x->x_socket,SOL_SOCKET,SO_BROADCAST,(char *)&on,sizeof(on))<0)
		    pdp_post("pdp_udp_sender: can't set broadcast flag");
	    }
	}
    }
}

/* setup */
t_pdp_udp_sender *pdp_udp_sender_new(void)
{
    t_pdp_udp_sender *x = pdp_alloc(sizeof(*x));
    memset(x,0,sizeof(*x));

    x->x_chunk_list = 0;

    /* no connection */
    x->x_socket = -1;


    /* set flow control */
    pdp_udp_sender_timeout_us(x, 50000);
    x->x_sleep_count = 0;
    pdp_udp_sender_sleepgrain_us(x, 0);
    pdp_udp_sender_sleepperiod(x, 50);
    pdp_udp_sender_udp_packet_size(x, 1472); //optimal udp packet size (ip: 1500 = 28 + 1472)


    return x;
}

void pdp_udp_sender_free(t_pdp_udp_sender *x)
{
    int i;
    void* retval;
    if (x->x_socket != -1) close(x->x_socket);
    if (x->x_chunk_list) free (x->x_chunk_list);
}

/* send, returns 1 on success, 0 on error */
int pdp_udp_sender_send(t_pdp_udp_sender *x, char* type, unsigned int size, void *data)
{

    /* SEND A PACKET */

    /* get the type and data from caller */
    /* send header packet and make sure it has arrived */
    /* send a chunk burst */
    /* send done packet and get the resend list */
    /* repeat until send list is empty */


    int hs_retry = 5; // retry count for initial handshake
    int rs_retry = 5; // retry count for resends

    /* check if we have a target */
    if (-1 == x->x_socket) goto transerror;

    /* setup internal state */
    _prepare_for_new_transmission(x,type,size,data);

    /* handshake a new transmission */
    do {
	if (!(hs_retry--)) break;
	//    pdp_post("handshake retry %d for packet %x", hscount, x->x_connection_id);
	_send_header_packet(x);
    } while (!_receive_packet(x, PDP_UDP_ACKNEW));
    

    /* exit if no handshake was possible */
    if (hs_retry < 0){
	DD pdp_post("pdp_netsend: DROP: receiver does not accept new transmission");
	goto transerror;
    }

    /* transmission loop */
    do {
	if (!(rs_retry--)) break;
	_send_chunks(x);
    } while (_need_resend(x));

    /* exit if transmission was not successful */
    if (rs_retry < 0){
	DD pdp_post("pdp_netsend: DROP: receiver did not confirm reception");
	goto transerror;
    }	

    /* send successful */
    return 1;

  transerror:
    /* transmission error */
    return 0;
}





More information about the Pd-cvs mailing list