2009-06-27

Pipelines

…ou em brasileiro: canalizações :)

$ man 7 pipe

Como diz no manual um pipe é um canal de comunicação unidireccional entre dois processos. Um dos processos escreve (com write) numa ponta, o outro lê (com read) na outra.

O que temos de fazer para a nossa shell permitir este tipo de comunicação entre processos é o seguinte:

  • processar a string do comando para detectar pedidos de pipes;
  • se forem pedidos pipes, lançar os processos redireccionando a ponta correspondente do pipe;

A primeira parte não vou descrever, mais uma vez por ser um detalhe que não tem nada a ver com os pipes em si.

Para criar um pipe existe a syscall pipe, que recebe um array de dois descritores e preenche-o com os descritores das duas pontas do pipe criado. O índice 0 é a ponta de leitura e o índice 1 é a ponta de escrita.

Depois de criar o pipe, podemos lançar os dois processos que o vão utilizar, com a já habitual construção com fork/exec:

int fds[2];
pipe(fds);

pid_t pid = fork();
if(pid == 0)
{
// Fazer mais umas cenas no pipe
execvp(left_args[0], left_args);
perror(left_args[0]);
exit(1);
}
else
{
pid = fork();
if(pid == 0)
{
// Fazer mais umas cenas no pipe
execvp(right_args[0], right_args);
perror(right_args[0]);
exit(1);
}
else
{
// Fazer mais umas cenas no pipe
}
}

Agora surgem alguns problemas. Ao criar o pipe, ambas as pontas estão abertas para a operação relevante (leitura ou escrita). Ao fazer fork temos ambas as pontas abertas nos três processos (pai e 2 filhos). Temos de fechar estas pontas, senão os processos nunca vão receber end-of-file ao ler do pipe, e acabarão por bloquear.

O pai não vai utilizar o pipe, por isso podemos simplesmente fechar ambas as pontas no pai, depois dos filhos terem sido criados, ou seja, no último else. Quanto aos filhos, cada um deve fechar a ponta que não lhe diz respeito.

int fds[2];
pipe(fds);

pid_t pid = fork();
if(pid == 0)
{
close(fds[0]);
// Fazer mais umas cenas no pipe

execvp(left_args[0], left_args);
perror(left_args[0]);
exit(1);
}
else
{
pid = fork();
if(pid == 0)
{
close(fds[1]);
// Fazer mais umas cenas no pipe

execvp(right_args[0], right_args);
perror(right_args[0]);
exit(1);
}
else
{
close(fds[0]);
close(fds[1]);
}
}

E agora ainda falta por os dois processos a escrever e a ler do pipe, respectivamente. Tal como fizemos para redireccionar para ficheiros, vamos usar dup2 para redireccionar as standard streams para o pipe e ficamos então com esta estrutura:

int fds[2];
pipe(fds);

pid_t pid = fork();
if(pid == 0)
{
close(fds[0]);
dup2(fds[1], 1);

execvp(left_args[0], left_args);
perror(left_args[0]);
exit(1);
}
else
{
pid = fork();
if(pid == 0)
{
close(fds[1]);
dup2(fds[0], 0);

execvp(right_args[0], right_args);
perror(right_args[0]);
exit(1);
}
else
{
close(fds[0]);
close(fds[1]);
}
}

Eu disse pipeS, e não 1 pipe

Assim só estamos a usar um pipe. E se quisermos encadear um número arbitrário de processos?

Mais uma vez vou ignorar o processamento do comando para descobrir os pipes e os processos a executar. Vamos assumir que os comandos a executar estão neste array:

char*** cmds;

Ouch, tripla indirecção. Não se assustem. Não vejam isto como um apontador para um apontador para um apontador para caractere. Vejam como uma lista de comandos. Uma string (lista de caracteres) é um apontador para caracteres: char*. Um comando (lista de argumentos) é um apontador para strings: char**. E uma lista de comandos é um apontador para comandos: char***.

Vamos precisar de um ciclo para percorrer estes comandos e fazer pipes entre eles. Excepto o primeiro processo, todos vão ler de um pipe. Excepto o último pipe, todos vão escrever num pipe.

Para ajudar a leitura vamos isolar em funções as operações “primitivas” que iremos usar neste ciclo: criar um pipe, fechar um pipe, preparar leitura de um pipe, preparar escrita num pipe.

// Criar um pipe já existe

void close_pipe(int fds[])
{
close(fds[0]);
close(fds[1]);
}

void set_read(int lpipe[])
{
close(lpipe[1]);
dup2(lpipe[0], 0);
}

void set_write(int rpipe[])
{
close(rpipe[0]);
dup2(rpipe[1], 1);
}

void copy_pipe(int* src, int* dst)
{
dst[0] = src[0];
dst[1] = src[1];
}
Agora podemos escrever o ciclo com recurso a estas funções. Vamos precisar de dois pipes. E vamos precisar de dois casos especiais: o primeiro e o último comando. Vamos então usar fork em três sítios: nos dois casos especiais e dentro do ciclo. É melhor pôr isso numa função para ser fácil de reutilizar.
O filho deve simplesmente efectuar os redireccionamentos necessários e lançar o comando. O pai fecha o pipe da esquerda (se existir).
void fork_and_pipe(char** args, int* lpipe, int* rpipe)
{
pid_t pid = fork();
if(pid == 0)
{
if(lpipe)
set_read(lpipe);
if(rpipe)
set_write(rpipe);
execvp(args[0], args);
perror(args[0]);
exit(EXIT_FAILURE);
}
else
{
if(lpipe)
close_pipe(lpipe);
}
}

Como o pipe à esquerda (input) de um processo é o mesmo que está à direita (output) do processo anterior temos que reutilizar este pipe de cada processo para o seguinte. Para isso basta que, depois de cada chamada a fork_and_pipe, o pipe da direita passe a ser o pipe da esquerda. Podemos fazer isto com a função copy_pipe que foi definida acima.

Podemos agora escrever o ciclo com os casos especiais:

int lpipe[2], rpipe[2];
// Primeiro pipe
pipe(rpipe);

// Primeiro processo
fork_and_pipe(cmds[0], NULL, rpipe);
copy_pipe(rpipe, lpipe);

for(i = 1; i < ncmds - 1; i++)
{
// i-ésimo pipe
pipe(rpipe);
// i-ésimo processo
fork_and_pipe(cmds[i], lpipe, rpipe);
copy_pipe(rpipe, lpipe);
}

// Último processo
fork_and_pipe(cmds[ncmds-1], lpipe, NULL);

5 commentários:

Anónimo disse...

mt bom trab :P

olha onde tens
void set_write(int rpipe[]){
closer .....
}

nao é close?

Martinho Fernandes disse...

Sorry, tens razão. Fixed.

Anónimo disse...

Ora boas, grande blog, grande ajuda! MAs...

Duvidas:
se tiver esta lista de comandos por exemplo: A x y| B w z | C k i
crio um pipe e dps um fork , com um filho para cada comando? filho para A, filho pa B...?

e isso tb quer dizer que crio um pipe, para cada lista de comandos?

Ou entendi mal?

Obrigado

Anónimo disse...

Também não percebo a função copy_pipe, para que serve.

Obrigado

fabio disse...

void copy_pipe(int* src, int dst)

falta o *void copy_pipe(int* src, int* dst)
^

Enviar um comentário