use crate::real_std::{
    fmt, fs,
    path::PathBuf,
    pin::Pin,
    sync::{Arc, Mutex},
};
use {
    collect_mac::collect,
    futures::{
        future::{self, BoxFuture},
        prelude::*,
        ready,
        task::{self, Poll},
    },
    http::{
        header::{HeaderMap, HeaderName, HeaderValue},
        StatusCode,
    },
    hyper::{body::Bytes, Server},
    pin_project_lite::pin_project,
};
use crate::base::types::{ArcType, Type};
use crate::{
    vm::{
        self,
        api::{
            generic, Collect, Eff, Function, Getable, OpaqueValue, PushAsRef, Pushable, VmType,
            WithVM, IO,
        },
        thread::{ActiveThread, RootedThread, Thread},
        ExternModule, Variants,
    },
    Error,
};
macro_rules! try_future {
    ($e:expr) => {
        try_future!($e, Box::pin)
    };
    ($e:expr, $f:expr) => {
        match $e {
            Ok(x) => x,
            Err(err) => return $f(::futures::future::err(err.into())),
        }
    };
}
pub struct HttpEffect;
impl VmType for HttpEffect {
    type Type = Self;
    fn make_type(vm: &Thread) -> ArcType {
        let r = generic::R::make_type(vm);
        Type::app(
            vm.find_type_info("std.http.types.HttpEffect")
                .map(|alias| alias.into_type())
                .unwrap_or_else(|_| Type::hole()),
            collect![r],
        )
    }
}
pub type EffectHandler<T> = Eff<HttpEffect, T>;
pub struct Headers(HeaderMap);
impl VmType for Headers {
    type Type = Vec<(String, Vec<u8>)>;
    fn make_type(vm: &Thread) -> ArcType {
        Vec::<(String, Vec<u8>)>::make_type(vm)
    }
}
impl<'vm> Pushable<'vm> for Headers {
    fn vm_push(self, context: &mut ActiveThread<'vm>) -> vm::Result<()> {
        Collect::new(
            self.0
                .iter()
                .map(|(name, value)| (name.as_str(), value.as_bytes())),
        )
        .vm_push(context)
    }
}
impl<'vm, 'value> Getable<'vm, 'value> for Headers {
    impl_getable_simple!();
    fn from_value(vm: &'vm Thread, value: Variants<'value>) -> Self {
        Headers(
            Collect::from_value(vm, value)
                .filter_map(|(name, value): (&str, &[u8])| {
                    match (
                        HeaderName::from_bytes(name.as_bytes()),
                        HeaderValue::from_bytes(value),
                    ) {
                        (Ok(name), Ok(value)) => Some((name, value)),
                        _ => None,
                    }
                })
                .collect(),
        )
    }
}
#[derive(Userdata, Trace, VmType, Clone)]
#[gluon(vm_type = "std.http.types.Body")]
#[gluon(crate_name = "::vm")]
#[gluon_userdata(clone)]
#[gluon_trace(skip)]
pub struct Body(
    Arc<Mutex<Pin<Box<dyn Stream<Item = Result<PushAsRef<Bytes, [u8]>, vm::Error>> + Send>>>>,
);
impl fmt::Debug for Body {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "hyper::Body")
    }
}
fn read_chunk(body: &Body) -> impl Future<Output = IO<Option<PushAsRef<Bytes, [u8]>>>> {
    use futures::future::poll_fn;
    let body = body.0.clone();
    poll_fn(move |cx| {
        let mut stream = body.lock().unwrap();
        Poll::Ready(IO::Value(
            if let Some(result) = ready!(stream.as_mut().poll_next(cx)) {
                match result {
                    Ok(chunk) => Some(chunk),
                    Err(err) => return IO::Exception(err.to_string()).into(),
                }
            } else {
                None
            },
        ))
    })
}
#[derive(Userdata, Trace, VmType, Clone)]
#[gluon(vm_type = "std.http.types.ResponseBody")]
#[gluon(crate_name = "::vm")]
#[gluon_userdata(clone)]
#[gluon_trace(skip)]
pub struct ResponseBody(Arc<Mutex<Option<hyper::body::Sender>>>);
impl fmt::Debug for ResponseBody {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "ResponseBody")
    }
}
fn write_response(response: &ResponseBody, bytes: &[u8]) -> impl Future<Output = IO<()>> {
    use futures::future::poll_fn;
    let mut unsent_chunk = Some(bytes.to_owned().into());
    let response = response.0.clone();
    poll_fn(move |cx| {
        info!("Starting response send");
        let mut sender = response.lock().unwrap();
        let sender = sender
            .as_mut()
            .expect("Sender has been dropped while still in use");
        let chunk = unsent_chunk
            .take()
            .expect("Attempt to poll after chunk is sent");
        match sender.poll_ready(cx) {
            Poll::Pending => {
                unsent_chunk = Some(chunk);
                return Poll::Pending;
            }
            Poll::Ready(Ok(_)) => (),
            Poll::Ready(Err(err)) => {
                info!("Could not send http response {}", err);
                return IO::Exception(err.to_string()).into();
            }
        }
        match sender.try_send_data(chunk) {
            Ok(()) => Poll::Ready(IO::Value(())),
            Err(chunk) => {
                unsent_chunk = Some(chunk);
                IO::Exception("Could not send http response".into()).into()
            }
        }
    })
}
#[derive(Debug, Userdata, Trace, VmType, Clone)]
#[gluon(vm_type = "std.http.types.Uri")]
#[gluon(crate_name = "::vm")]
#[gluon_trace(skip)]
#[gluon_userdata(clone)]
struct Uri(http::Uri);
field_decl! { http, method, uri, status, body, request, response, headers }
type Request = record_type! {
    method => String,
    uri => Uri,
    body => Body
};
pub type Response = record_type! {
    status => u16,
    headers => Headers
};
type HttpState = record_type! {
    request => Request,
    response => ResponseBody
};
#[derive(Getable, VmType)]
#[gluon(crate_name = "::vm")]
struct Settings {
    port: u16,
    tls_cert: Option<PathBuf>,
}
fn listen(
    settings: Settings,
    WithVM { vm, value }: WithVM<OpaqueValue<RootedThread, EffectHandler<Response>>>,
) -> impl Future<Output = IO<()>> + Send + 'static {
    let vm = vm.root_thread();
    listen_(settings, vm, value).map(IO::from)
}
async fn listen_(
    settings: Settings,
    thread: RootedThread,
    handler: OpaqueValue<RootedThread, EffectHandler<Response>>,
) -> vm::Result<()> {
    let thread = match thread.new_thread() {
        Ok(thread) => thread,
        Err(err) => return Err(err),
    };
    impl tower_service::Service<hyper::Request<hyper::Body>> for Handler {
        type Response = hyper::Response<hyper::Body>;
        type Error = Error;
        type Future = BoxFuture<'static, Result<http::Response<hyper::Body>, Error>>;
        fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
            Ok(()).into()
        }
        fn call(&mut self, request: hyper::Request<hyper::Body>) -> Self::Future {
            let (parts, body) = request.into_parts();
            self.handle(parts.method, parts.uri, body)
        }
    }
    let addr = format!("0.0.0.0:{}", settings.port).parse().unwrap();
    let listener = Handler::new(&thread, handler);
    if let Some(cert_path) = &settings.tls_cert {
        let identity = fs::read(cert_path).map_err(|err| {
            vm::Error::Message(format!(
                "Unable to open certificate `{}`: {}",
                cert_path.display(),
                err
            ))
        })?;
        let identity = native_tls::Identity::from_pkcs12(&identity, "")
            .map_err(|err| vm::Error::Message(err.to_string()))?;
        let acceptor = tokio_native_tls::TlsAcceptor::from(
            native_tls::TlsAcceptor::new(identity)
                .map_err(|err| vm::Error::Message(err.to_string()))?,
        );
        let http = hyper::server::conn::Http::new();
        let tcp_listener = tokio::net::TcpListener::bind(&addr)
            .map_err(|err| vm::Error::Message(err.to_string()))
            .await?;
        let incoming = tokio_stream::wrappers::TcpListenerStream::new(tcp_listener)
            .err_into()
            .and_then(|stream| {
                acceptor.accept(stream).map_err(|err| {
                    info!("Unable to accept TLS connection: {}", err);
                    Box::new(err) as Box<dyn ::std::error::Error + Send + Sync>
                })
            });
        pin_project! {
            struct Acceptor<S> {
                #[pin]
                incoming: S,
            }
        }
        impl<S, T, E> hyper::server::accept::Accept for Acceptor<S>
        where
            S: Stream<Item = Result<T, E>>,
        {
            type Conn = T;
            type Error = E;
            fn poll_accept(
                self: Pin<&mut Self>,
                cx: &mut task::Context<'_>,
            ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
                self.project().incoming.poll_next(cx)
            }
        }
        return hyper::server::Builder::new(Acceptor { incoming }, http)
            .serve(hyper::service::make_service_fn(move |_| {
                future::ready(Ok::<_, hyper::Error>(listener.clone()))
            }))
            .map_err(|err| vm::Error::from(format!("Server error: {}", err)))
            .await;
    }
    Server::bind(&addr)
        .serve(hyper::service::make_service_fn(move |_| {
            future::ready(Ok::<_, hyper::Error>(listener.clone()))
        }))
        .map_err(|err| vm::Error::from(format!("Server error: {}", err)))
        .map_ok(|_| ())
        .await
}
type ListenFn = fn(OpaqueValue<RootedThread, EffectHandler<Response>>, HttpState) -> IO<Response>;
#[derive(Clone)]
pub struct Handler {
    handle: Function<RootedThread, ListenFn>,
    handler: OpaqueValue<RootedThread, EffectHandler<Response>>,
}
impl Handler {
    pub fn new(
        thread: &Thread,
        handler: OpaqueValue<RootedThread, EffectHandler<Response>>,
    ) -> Self {
        let handle: Function<RootedThread, ListenFn> = thread
            .get_global("std.http.handle")
            .unwrap_or_else(|err| panic!("{}", err));
        Self { handle, handler }
    }
    pub fn handle<E>(
        &mut self,
        method: http::Method,
        uri: http::Uri,
        body: impl Stream<Item = Result<Bytes, E>> + Send + 'static,
    ) -> BoxFuture<'static, crate::Result<hyper::Response<hyper::Body>>>
    where
        E: fmt::Display + Send + 'static,
    {
        let child_thread = try_future!(self.handle.vm().new_thread());
        let mut handle = try_future!(self.handle.re_root(child_thread));
        let gluon_request = record_no_decl! {
            method => method.as_str().to_owned(),
            uri => Uri(uri),
            body => Body(Arc::new(Mutex::new(Box::pin(
                body
                    .map_err(|err| vm::Error::Message(format!("{}", err)))
                    .map_ok(PushAsRef::<_, [u8]>::new)
            ))))
        };
        let (response_sender, response_body) = hyper::Body::channel();
        let response_sender = Arc::new(Mutex::new(Some(response_sender)));
        let http_state = record_no_decl! {
            request => gluon_request,
            response => ResponseBody(response_sender.clone())
        };
        let handler = self.handler.clone();
        Box::pin(async move {
            handle
                .call_async(handler, http_state)
                .map(move |result| match result {
                    Ok(value) => {
                        match value {
                            IO::Value(record_p! { status, headers }) => {
                                *response_sender.lock().unwrap() = None;
                                let status = StatusCode::from_u16(status)
                                    .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
                                let mut response = http::Response::builder()
                                    .status(status)
                                    .body(response_body)
                                    .unwrap();
                                *response.headers_mut() = headers.0;
                                Ok(response)
                            }
                            IO::Exception(err) => {
                                info!("{}", err);
                                Ok(http::Response::builder()
                                    .status(StatusCode::INTERNAL_SERVER_ERROR)
                                    .body("".into())
                                    .unwrap())
                            }
                        }
                    }
                    Err(err) => {
                        info!("{}", err);
                        Ok(http::Response::builder()
                            .status(StatusCode::INTERNAL_SERVER_ERROR)
                            .body("".into())
                            .unwrap())
                    }
                })
                .await
        })
    }
}
pub fn load_types(vm: &Thread) -> vm::Result<ExternModule> {
    vm.register_type::<Body>("std.http.types.Body", &[])?;
    vm.register_type::<ResponseBody>("std.http.types.ResponseBody", &[])?;
    vm.register_type::<Uri>("std.http.types.Uri", &[])?;
    ExternModule::new(
        vm,
        record! {
            type std::http::types::Body => Body,
            type std::http::types::ResponseBody => ResponseBody,
            type std::http::types::Uri => Uri,
            type std::http::Method => String,
            type std::http::StatusCode => u16,
            type std::http::Request => Request,
            type std::http::Response => Response,
            type std::http::Headers => Headers,
            type std::http::HttpState => HttpState
        },
    )
}
macro_rules! uri_binds {
    ($($id: ident)*) => {
        record!{
            $(
                $id => primitive!(1, concat!("std.http.prim.uri.", stringify!($id)), |u: &Uri| (u.0).$id())
            ),*
        }
    }
}
mod std {
    pub(crate) mod http {
        pub(crate) use crate::std_lib::http as prim;
    }
}
pub fn load(vm: &Thread) -> vm::Result<ExternModule> {
    ExternModule::new(
        vm,
        record! {
            listen => primitive!(2, async fn std::http::prim::listen),
            read_chunk => primitive!(1, async fn std::http::prim::read_chunk),
            write_response => primitive!(2, async fn std::http::prim::write_response),
            port => primitive!(1, "std.http.prim.uri.port", |u: &Uri| (u.0).port().map(|p| p.as_u16())),
            uri => uri_binds!(path host query to_string)
        },
    )
}